# Parse and Broker Messages with CombinedParsers
Faster write fast parsers

In [1]:
using CombinedParsers                   ## JuliaCon 2021


speaker = "Gregor Kappler
psychometric scientist and programmer, independent";



import CombinedParsers.Regexp: word, words, whitespace # hide

This workshop is about using the julia package CombinedParsers.

In [2]:
person_background = Sequence(
    :name => !words, whitespace, :lastname => !word, "\n",
    :background => !words)

person_background(speaker)

(name = "Gregor", lastname = "Kappler", background = "psychometric scientist and programmer")

## Parsing
*converting vectors (strings/binary) to any type with julia*
and CombinedParsers

Are julia types cool for parsing?

## Workshop questions

Are Julia types & method dispatch suited

-   for converting Strings to anything fast?
-   to build efficient string/message processing systems?

## Workshop Outline:
1.  Message Broker like Kafka
2.  Writing recursive CombinedParsers (ENBF)
3.  CombinedParsers internals: States and optimization
    Contributers welcome.

# Part 1: Julia for message brokering?

Julia

-   established in distributed scientific computing.
-   parametric types interesting for compiled parsers
-   growing adoption
-   opportunities in industry?

Workshop notebook online at <https://github.com/gkappler/JuliaCon2021_CombinedParsers_workshop>.

**Please use julia 1.5** (1.6 issues with generated functions for long sequences).

And please update `CombinedParsers.jl` to v0.1.7

## Usecase Example: Webserver Message Brokering

Big Picture:

-   Event streaming is big business,
-   Confluence uses Apache Kafka,
    <img src="https://dz2cdn3.dzone.com/storage/article-thumb/12418930-thumb.jpg" width="200"/>
    not hadoop, but another java elephant in solution zoo
    <img src="https://cdn.worldvectorlogo.com/logos/hadoop.svg" width="200"/>

Are Julia types & method dispatch
suited to build an efficient distributed message broker/parser?

## Compiledparsers Performance: PCRE Benchmarks
- `Regex` benchmarks: 86.0ns-521.0ns.
- `Regcomb` benchmarks: 34.0ns-19704.0ns.
- average ratio of `time_Recomb/time_Regex`: 1.26.
- 59.0% of benchmarks faster with `Regcomb?`
![](https://gkappler.github.io/CombinedParsers.jl/dev/man/log_btimes.png)

### Know/interested in Kafka?

Kafka terminology:

#### Message Broker

##### Producer
> The producer sends data directly to the broker that is the leader for the partition without any intervening routing tier.

<https://kafka.apache.org/documentation/#theproducer>

##### Consumer
> The Kafka consumer works by issuing "fetch" requests to the brokers leading the partitions it wants to consume. The consumer specifies its offset in the log with each request and receives back a chunk of log beginning from that position. The consumer thus has significant control over this position and can rewind it to re-consume data if need be.

<https://kafka.apache.org/documentation/#theconsumer>

##### How can Message Broker processing be done in julia?
> Push vs. pull
>
> An initial question we considered is whether consumers should pull data from brokers or brokers should push data to the consumer. In this respect Kafka follows a more traditional design, shared by most messaging systems, where data is pushed to the broker from the producer and pulled from the broker by the consumer.
<https://kafka.apache.org/documentation/#design_pull>

Julia `RemoteChannel` has same design, compiled and distributed.
(Julia Channels oriented at jobs/tasks/parallelism <https://docs.julialang.org/en/v1/manual/asynchronous-programming/#More-on-Channels>)

In [3]:
using Distributed

addprocs(2); # add worker processes

In [4]:
webserver_logs = RemoteChannel()

RemoteChannel{Channel{Any}}(1, 1, 28)

### POC Focus:
distributed compiled processing/parsing.

### POC limitation:
in memory (no stream persistence in `RemoteChannel`).

## Example data: log file
Webserver logs <https://github.com/elastic/examples/tree/master/Common%20Data%20Formats>

In [5]:
false && let logpath = "https://github.com/elastic/examples/raw/master/Common%20Data%20Formats"
    download("$logpath/apache_logs/apache_logs", "./apache_logs")
    download("$logpath/nginx_logs/nginx_logs", "./nginx_logs")
end

false

### Producers
<https://stackoverflow.com/questions/67348301/julia-iterator-which-parses-each-line-in-file>
1. read line by line
2. parse a NamedTuple
3. geoip

#### Waiting file line iterator:

Modified from `Base.EachLine`

In [6]:
@everywhere struct EachLineFollow{IOT <: IO}
    stream::IOT
    ondone::Function
    keep::Bool
    wait::Float64
    EachLineFollow(stream::IO=stdin; wait=1.0, ondone::Function=()->nothing, keep::Bool=false) =
        new{typeof(stream)}(stream, ondone, keep, wait)
end

@everywhere function Base.iterate(itr::EachLineFollow, state=nothing)
    while eof(itr.stream)
        # nicely waiting itr.wait sec when eof
        sleep(itr.wait)
    end
    (readline(itr.stream, keep=itr.keep), nothing)
end

@everywhere Base.eltype(::Type{<:EachLineFollow}) = String

@everywhere Base.IteratorSize(::Type{<:EachLineFollow}) = SizeUnknown()

#### produce log lines
`make_jobs(n)` Following similar to example in <https://docs.julialang.org/en/v1/manual/distributed-computing/>
More on distributed computing:
<https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.RemoteChannel>

In [7]:
@everywhere function put_lines!(f::IO, broker, a...)
    for l in EachLineFollow(f)
        put!(broker, (l,a...))
    end
end
@everywhere put_lines!(f::String, broker, a...) =
    put_lines!(open(f), broker, f, a...)

@async put_lines!("./apache_logs", webserver_logs)
@async put_lines!("./nginx_logs", webserver_logs)

logline, source = ("""
83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
""", "apache") # 

("83.149.9.216 - - [17/May/2015:10:05:03 +0000] \"GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1\" 200 203023 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"\n", "apache")

In [27]:
logline, source = take!(webserver_logs)

("83.149.9.216 - - [17/May/2015:10:05:34 +0000] \"GET /presentations/logstash-monitorama-2013/images/sad-medic.png HTTP/1.1\" 200 430406 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"", "./apache_logs")

### Transformation
Consumer of `event::String` and producer of parsings, similar to
<https://kafka.apache.org/documentation/#connect_transforms>

In [8]:
using CombinedParsers

can be combined with

In [9]:
using TextParse

decimal_byte = parser(TextParse.Numeric(UInt8))
datetime = parser(TextParse.DateTimeToken(TextParse.DateFormat("d/u/Y:H:M:S")))
decimal_byte("10"), datetime("17/May/2015:10:05:43")

(0x0a, Dates.DateTime("2015-05-17T10:05:43"))

##### Matching IP Adresses

In [10]:
ipv4 = join(Repeat(4,4,decimal_byte),'.')
ipv6 = join(Repeat(8,8,integer_base(16)),':')

[0m[1m🗄[22m[0m Sequence |> map([34m#73[39m)
├─ [36m[1m[[:xdigit:]][22m[39m[36m[1m*[22m[39m[0m CharIn |> Repeat[34m |> ![39m |> map([34m#146[39m)
└─ [0m[1m🗄[22m[36m[1m{7}[22m[39m[0m Sequence |> map([34m#55[39m) |> Repeat
   ├─ [36m[1m\:[22m[39m[0m 
   └─ [36m[1m[[:xdigit:]][22m[39m[36m[1m*[22m[39m[0m CharIn |> Repeat[34m |> ![39m |> map([34m#146[39m)
::Array{Int64,1}


In [12]:
using BenchmarkTools

m = match(ipv6, "2003:ef:272a:5900:138a:7ce1:2ed:cd2")
@btime get(m)

  11.457 μs (91 allocations: 4.08 KiB)


8-element Array{Int64,1}:
  8195
   239
 10026
 22784
  5002
 31969
   749
  3282

`!` is for capturing matched `SubString` (Syntax inspired by scala fastparse)
Either: format alternatives

In [11]:
ip = !Either(ipv4, ipv6)

[36m[1m|[22m[39m[0m[1m🗄[22m[0m Either[34m |> ![39m
├─ [0m[1m🗄[22m[0m Sequence |> map([34m#73[39m)
│  ├─ [0m <UInt8>
│  └─ [0m[1m🗄[22m[36m[1m{3}[22m[39m[0m Sequence |> map([34m#55[39m) |> Repeat
│     ├─ [36m[1m\.[22m[39m[0m 
│     └─ [0m <UInt8>
└─ [0m[1m🗄[22m[0m Sequence |> map([34m#73[39m)
   ├─ [36m[1m[[:xdigit:]][22m[39m[36m[1m*[22m[39m[0m CharIn |> Repeat[34m |> ![39m |> map([34m#146[39m)
   └─ [0m[1m🗄[22m[36m[1m{7}[22m[39m[0m Sequence |> map([34m#55[39m) |> Repeat
      ├─ [36m[1m\:[22m[39m[0m 
      └─ [36m[1m[[:xdigit:]][22m[39m[36m[1m*[22m[39m[0m CharIn |> Repeat[34m |> ![39m |> map([34m#146[39m)
::SubString{String}


Matching an ip pattern as `SubString` is faster:

In [13]:
m = match(ip, "2003:ef:272a:5900:138a:7ce1:2ed:cd2")
@btime get(m)

  29.636 ns (1 allocation: 32 bytes)


"2003:ef:272a:5900:138a:7ce1:2ed:cd2"

Defining parsers without syntax gives less (default) logging.

`@syntax name = expr` is a convenience macro defining `name=with_name(name,expr)` and custom parsing macro `@name_str`.

In [14]:
number = TextParse.Numeric(Int)
nospace = !Repeat(CharNotIn(' '))
escapedquotes = !Repeat(Either("\\\"", CharNotIn('"')))

@syntax apache_logline = Sequence(
    :ip               => ip,
    " ",
    :identd           => Either("-" => missing, nospace),
    " ",
    :user             => Either("-" => missing, nospace),
    " [",
    :date             => datetime, " +", :timezone => number,
    "] \"",
    :req_method       => !Either("GET", "POST", "HEAD", "OPTIONS"),
    ' ',
    :req_url          => nospace,
    " HTTP/",
    :req_http_version => TextParse.Numeric(Float64),
    "\" ",
    :status           => number,
    ' ',
    :size             => Either(number, '-' =>0 ) ,
    ' ',
    '"',
    :url              => escapedquotes,
    "\" \"",
    :browser          => escapedquotes, '"'
)

[0m[1m🗄[22m[0m Sequence |> map([34mntuple[39m) |> with_name(:[31m[1mapache_logline[22m[39m)
├─ [36m[1m|[22m[39m[0m[1m🗄[22m[0m Either[34m |> ![39m |> with_name(:[31m[1mip[22m[39m)
│  ├─ [0m[1m🗄[22m[0m Sequence |> map([34m#73[39m)
│  │  ├─ [0m <UInt8>
│  │  └─ [0m[1m🗄[22m[36m[1m{3}[22m[39m[0m Sequence |> map([34m#55[39m) |> Repeat
│  │     ├─ [36m[1m\.[22m[39m[0m 
│  │     └─ [0m <UInt8>
│  └─ [0m[1m🗄[22m[0m Sequence |> map([34m#73[39m)
│     ├─ [36m[1m[[:xdigit:]][22m[39m[36m[1m*[22m[39m[0m CharIn |> Repeat[34m |> ![39m |> map([34m#146[39m)
│     └─ [0m[1m🗄[22m[36m[1m{7}[22m[39m[0m Sequence |> map([34m#55[39m) |> Repeat
│        ├─ [36m[1m\:[22m[39m[0m 
│        └─ [36m[1m[[:xdigit:]][22m[39m[36m[1m*[22m[39m[0m CharIn |> Repeat[34m |> ![39m |> map([34m#146[39m)
├─ [36m[1m\ [22m[39m[0m 
├─ [36m[1m|[22m[39m[0m[1m🗄[22m[0m Either |> with_name(:[31m[1midentd[22m[39m)
│  ├─ [36m[1m

#### Logging
can be done selectively

In [30]:
apache_logline(logline, log=[:ip]);

   [4mmatch[24m [32mip[39m@1-13: [4m[1m83.149.9.216[22m[24m[39m - - [[39m


and fully

In [29]:
apache_logline(logline, log=true)

   [4mmatch[24m [32mip[39m@1-13: [4m[1m83.149.9.216[22m[24m[39m - - [[39m
   [4mmatch[24m [32midentd[39m@14-15: [0m[1m.216 [22m[4m[1m-[22m[24m[39m - [17[39m
   [4mmatch[24m [32muser[39m@16-17: [0m[1m16 - [22m[4m[1m-[22m[24m[39m [17/M[39m
   [4mmatch[24m [32mdate[39m@19-39: [0m[1m- - [[22m[4m[1m17/May/2015:10:05:34[22m[24m[39m +0000[39m
   [4mmatch[24m [32mtimezone[39m@41-45: [0m[1m:34 +[22m[4m[1m0000[22m[24m[39m] \"GET[39m
   [4mmatch[24m [32mreq_method[39m@48-51: [0m[1m00] \"[22m[4m[1mGET[22m[24m[39m /pres[39m
   [4mmatch[24m [32mreq_url[39m@52-112: [0m[1m\"GET [22m[4m[1m/presentations/logstash-monitorama-2013/images/sad-medic.png[22m[24m[39m HTTP/[39m
   [4mmatch[24m [32mreq_http_version[39m@118-121: [0m[1mHTTP/[22m[4m[1m1.1[22m[24m[39m\" 200 [39m
   [4mmatch[24m [32mstatus[39m@123-126: [0m[1m1.1\" [22m[4m[1m200[22m[24m[39m 43040[39m
   [4mmatch[24m [32msize[39m@127-1

NamedTuple{(:ip, :identd, :user, :date, :timezone, :req_method, :req_url, :req_http_version, :status, :size, :url, :browser),Tuple{SubString{String},Union{Missing, SubString{String}},Union{Missing, SubString{String}},Dates.DateTime,Int64,String,SubString{String},Float64,Int64,Int64,SubString{String},SubString{String}}}(("83.149.9.216", missing, missing, Dates.DateTime("2015-05-17T10:05:34"), 0, "GET", "/presentations/logstash-monitorama-2013/images/sad-medic.png", 1.1, 200, 430406, "http://semicomplete.com/presentations/logstash-monitorama-2013/", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"))

In [28]:
function parse_logline((event,file), parser, producer)
    tevent = tryparse(parser, event)
    if tevent === nothing
        tryparse(parser, event, log=true)
        println(event)
    else
        put!(producer,(tevent, file))
        tevent, file
    end
end

logtable_rows = RemoteChannel()
parse_logline((logline, source), apache_logline, logtable_rows)

(NamedTuple{(:ip, :identd, :user, :date, :timezone, :req_method, :req_url, :req_http_version, :status, :size, :url, :browser),Tuple{SubString{String},Union{Missing, SubString{String}},Union{Missing, SubString{String}},Dates.DateTime,Int64,String,SubString{String},Float64,Int64,Int64,SubString{String},SubString{String}}}(("83.149.9.216", missing, missing, Dates.DateTime("2015-05-17T10:05:34"), 0, "GET", "/presentations/logstash-monitorama-2013/images/sad-medic.png", 1.1, 200, 430406, "http://semicomplete.com/presentations/logstash-monitorama-2013/", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36")), "./apache_logs")

Now lets connect `webserver_logs::RemoteChannel` with `logtable_rows::RemoteChannel` with the `parse_logline` function.

Note: this could emit any other type too!

In [17]:
function take_map!(f::Function, broker, a...; kw...)
    while true
        event = take!(broker)
        f(event, a...; kw...)
    end
end



@async take_map!(parse_logline,  # map function
                 webserver_logs, # input
                 apache_logline, # further arguments to parse_logline...
                 logtable_rows)

Task (runnable) @0x00007f3699640280

### Transformation: IP Geolocation (privacy obfuscation)
<https://www.maketecheasier.com/ip-address-geolocation-lookups-linux/>
no city level

In [18]:
`geoiplookup 162.158.91.43` |> run

GeoIP Country Edition: DE, Germany
GeoIP City Edition, Rev 0: IP Address not found


Process(`[4mgeoiplookup[24m [4m162.158.91.43[24m`, ProcessExited(0))

In [31]:
`geoiplookup 8.8.8.8` |> run

GeoIP Country Edition: US, United States
GeoIP City Edition, Rev 0: US, CO, Colorado, Broomfield, 80021, 39.893799, -105.114403


Process(`[4mgeoiplookup[24m [4m8.8.8.8[24m`, ProcessExited(0))

We parse the `geoiplookup` output (on city level, if available)

In [32]:
geoiplookup_out = Either(
    Sequence(6,
             "GeoIP Country Edition: ", inline, newline,
             "GeoIP City Edition, Rev 0: ", NegativeLookahead("IP Address not found"), !inline, newline),
    Sequence(2,
             "GeoIP Country Edition: ", !inline, newline)
)

function geoip(ip::AbstractString)
    read(`geoiplookup $ip`, String) |> geoiplookup_out
end
iplookup = Either(map(geoip, !ipv4),
                  ipv6 => "no GeoIP for IPv6")

geoip("162.158.91.43"), geoip("8.8.8.8")

("DE, Germany", "US, CO, Colorado, Broomfield, 80021, 39.893799, -105.114403")

IPv6 is not supported by geoiplookup.

Using method dispatch again (sloppily assuming a `:ip` property).

In [20]:
function geoip(x::NamedTuple)
    (; geoip=iplookup(x.ip),
     ( p => getproperty(x,p)
       for p in propertynames(x)
           if p != :ip)... )
end
geoip((ip="162.158.91.43", some=1))

geoip(x::Tuple{<:NamedTuple,<:AbstractString}) =
    (geoip(x[1]),x[2])

geoip(x::Tuple{<:NamedTuple,<:AbstractString}, producer::RemoteChannel) =
    put!(producer, geoip(x))

geoip (generic function with 4 methods)

In [25]:
x = take!(logtable_rows)

(NamedTuple{(:ip, :identd, :user, :date, :timezone, :req_method, :req_url, :req_http_version, :status, :size, :url, :browser),Tuple{SubString{String},Union{Missing, SubString{String}},Union{Missing, SubString{String}},Dates.DateTime,Int64,String,SubString{String},Float64,Int64,Int64,SubString{String},SubString{String}}}(("93.180.71.3", missing, missing, Dates.DateTime("2015-05-17T08:05:23"), 0, "GET", "/downloads/product_1", 1.1, 304, 0, "-", "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)")), "./nginx_logs")

In [21]:
geoip_rows = RemoteChannel()

@async take_map!(geoip,  # map function
                 logtable_rows, # input
                 geoip_rows)

Task (runnable) @0x00007f369a457a90

In [26]:
x = take!(geoip_rows)

((geoip = "RU, 48, Moscow City, Moscow, N/A, 55.752201, 37.615601", identd = missing, user = missing, date = Dates.DateTime("2015-05-17T10:05:43"), timezone = 0, req_method = "GET", req_url = "/presentations/logstash-monitorama-2013/images/kibana-dashboard3.png", req_http_version = 1.1, status = 200, size = 171717, url = "http://semicomplete.com/presentations/logstash-monitorama-2013/", browser = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"), "./apache_logs")

## Consumers and sinks:
`RemoteChannel` can be mapped with any julia function, e.g.
- store in Database with Lighthouse.jl
- log file
- dispatched to multiple consumers
- gittrie.jl with csv (published soon)

### Parallelism
Useful for indexing use case with
Wiktionary developing wikitext recursive parser <https://github.com/gkappler/WikitextParser.jl>

## Message brokering Proof of Concept
Comparing julia with java
- using function dispatch in julia
- java classes for Kafka transformations
Julia is slicker (runtime, devtime and concepts)

### Prospect for message brokering in julia.
#### Kafka Event streams
Workshop focus on processing, not handling/storage.

> A stream is the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set.
> A stream is an ordered, **replayable**, and **fault-tolerant** sequence of immutable data records, where a data record is defined as a key-value pair.

<https://kafka.apache.org/11/documentation/streams/core-concepts#streams_topology>

Serializing messages with offset indices, e.g. with <https://docs.julialang.org/en/v1/stdlib/Mmap/>, <https://github.com/JuliaData/MemPool.jl>
Kafka streams feature interesting for julia `RemoteChannel`?

Current business deal-breakers:
-   no [processing guarantee](https://kafka.apache.org/11/documentation/streams/core-concepts#streams_processing_guarantee)

No discussion of Kafka events ∈ topics, or partitions.

---

*This notebook was generated using [Literate.jl](https://github.com/fredrikekre/Literate.jl).*