-
Notifications
You must be signed in to change notification settings - Fork 0
Statement Rules
The statement stage takes the message body and tokenizes it into words
and the symbols that separate them. You describe it with a
StatementParser: an ordered list of Actions applied in sequence.
from log2seq.statement import StatementParser, Split, FixIP
sp = StatementParser([Split(" "), FixIP(), Split(":")])
words, symbols = sp.process_line("ping 2001:db8::1 from 10.0.0.5:80")
words # ['ping', '2001:db8::1', 'from', '10.0.0.5', '80']
symbols # ['', ' ', ' ', ' ', ':', '']symbols is always one longer than words (len(symbols) == len(words) + 1):
there is a separator before the first word and after the last. Either end may be
the empty string.
Internally the statement is carried as a list of (substring, flag) tuples, and
each Action rewrites that list. A flag is one of three states:
- UNKNOWN — not yet decided; still a candidate for later actions to split or protect.
- FIXED — confirmed as a single word; later actions leave it alone.
-
SEPARATORS — a delimiter; it becomes a
symbol, never aword.
Every Action.do(parts) -> parts consumes the list and returns a new one,
re-flagging substrings as it goes. Because actions only ever transform this list,
order is significant — and that is the main lever you have.
# FixIP runs BEFORE the ":" split, so the addresses are FIXED first and the
# later split leaves them whole:
StatementParser([Split(" "), FixIP(), Split(":")]).process_line("a 10.0.0.5:80")[0]
# ['a', '10.0.0.5', '80']
# Without FixIP the address would be torn apart by the ":" split.This is why the default parser defers its : split to the very end (after
fixing IPs, clock times and MAC addresses) — see Presets.
process_line(..., verbose=True) prints the part list after each action — the
best way to see a multi-step pipeline at work. Here is the default parser's
four-step statement pipeline (the one in Presets) on a single line:
from log2seq import preset
preset.default_statement_parser().process_line(
"ping 192.168.1.1 from 10.0.0.5:8080 at 12:34:56 done", verbose=True)Statement: ping 192.168.1.1 from 10.0.0.5:8080 at 12:34:56 done
Split: 'ping', '192.168.1.1', 'from', '10.0.0.5:8080', 'at', '12:34:56', 'done'
FixIP: 'ping', #192.168.1.1#, 'from', '10.0.0.5:8080', 'at', '12:34:56', 'done'
Fix: 'ping', #192.168.1.1#, 'from', '10.0.0.5:8080', 'at', #12:34:56#, 'done'
Split: 'ping', #192.168.1.1#, 'from', '10.0.0.5', '8080', 'at', #12:34:56#, 'done'
'…' marks an UNKNOWN part, #…# a FIXED one. Reading down the trace:
-
Split breaks the line on the standard symbols (but not
:). -
FixIP fixes
192.168.1.1;10.0.0.5:8080is left UNKNOWN, because the trailing:8080means the whole part is not a bare address. -
Fix fixes the clock time
12:34:56(it would fix a MAC address the same way). - The final Split(":") acts only on the UNKNOWN parts, so
10.0.0.5:8080becomes10.0.0.5+8080while the FIXED IP and time keep their punctuation.
Had the : split run first, 12:34:56 would have been torn into 12, 34,
56. Fixing it first and splitting last is the whole point of the ordering.
| Action | what it does |
|---|---|
Split("…") |
split UNKNOWN parts on any of the given separator characters |
Fix(pattern | [patterns]) |
mark substrings matching a pattern as FIXED (one word) |
FixIP() |
mark IPv4 / IPv6 addresses as FIXED |
FixParenthesis([open, close]) |
mark a bracketed/quoted span (e.g. ["\"", "\""]) as FIXED |
FixPartial(pattern, fix_groups=[…]) |
within a match, FIX the named groups and split the rest |
Remove(pattern | [patterns]) |
mark matches as SEPARATORS (dropped from words) |
RemovePartial(pattern, remove_groups=[…]) |
within a match, drop the named groups, keep the rest |
ConditionalSplit(pattern, separators) |
split a part only if it matches pattern, by separators
|
def run(rules, s):
from log2seq.statement import StatementParser
return StatementParser(rules).process_line(s)[0]
run([Split(" "), Fix([r"\d+\.\d+"]), Split(".")], "v 1.2 build a.b")
# ['v', '1.2', 'build', 'a', 'b'] # 1.2 protected, a.b split
run([Split(" "), Remove(r"->")], "a -> b")
# ['a', 'b'] # "->" becomes a separator
run([FixParenthesis(['"', '"']), Split(' ')], 'say "hello world" now')
# ['say', 'hello world', 'now'] # the quoted span stays whole
run([Split(" "),
FixPartial(r'^(?P<ip>(\d{1,3}\.){3}\d{1,3})\.(?P<port>\d+)$', fix_groups=["ip", "port"]),
Split(".")],
"src 192.0.2.1.8080 ok")
# ['src', '192.0.2.1', '8080', 'ok'] # ip and port kept, the "." between them split
run([Split(" "), ConditionalSplit(r'^\w+=\w+$', '=')], "user=bob says hello")
# ['user', 'bob', 'says', 'hello'] # only the part matching key=value is splitSplit and Remove accept a single pattern or a list. The Partial actions use
named groups to address sub-spans: FixPartial keeps the listed groups
whole, RemovePartial drops them. ConditionalSplit is for tokens that need
their own splitting only when they match a shape (e.g. a Cisco-style
%KERNEL-4-EVENT-7 mnemonic) while leaving everything else alone.
An Action is any object with do(parts) -> parts, where parts is an iterable
of (substring, flag) tuples and the result is the same shape. Skip parts that
are not UNKNOWN, and re-flag the rest:
from log2seq.statement import _ActionBase, _FLAG_SEPARATORS
class DropExactly(_ActionBase):
"""Mark parts equal to a given token as separators (so they leave words)."""
def __init__(self, token):
self._token = token
def do(self, iterable_parts):
for s, flag in iterable_parts:
if self._is_active_part(s, flag) and s == self._token:
yield s, _FLAG_SEPARATORS
else:
yield s, flag
run([__import__("log2seq").statement.Split(" "), DropExactly("--")], "a -- b")
# ['a', 'b']In practice the built-in catalog plus careful ordering covers most needs;
subclass _ActionBase (and reuse _is_active_part and the _FLAG_* constants)
only when a token needs handling the catalog can't express. The internals are
documented in Architecture Overview.
- Header Rules — the first stage.
- Building a Parser — assembling and driving the parser.
- Practical Patterns — ordering decisions and debugging.