# Streaming Log Processing on GPUs

Almost since the coining of the phrase "big data", log-processing has been a primary use-case for analytics platforms.

Logs are *voluminous*:

A single website visit can result in 10s to 100s of log entries, each with lengthy strings of duplicated client information.

They're *complex*:
Extracting user activities often requires combining multiple records by time and unique session identifier(s).

They're *time-sensitive*:
When something goes wrong, you need to know quickly.

While early big data architectures were oriented towards batch jobs, the focus has shifted to lower-latency solutions. Distributed data processing tools and APIs have made it easier for developers to write _streaming_ applications.

Below we provide an example of how to do streaming web-log processing with RAPIDS, Dask, and Streamz.

## Pre-Requisites

We assume you're running in a RAPIDS nightly or release container, and thus already have cuDF and Dask installed.

Make sure you have [streamz](https://github.com/python-streamz/streamz) installed.

In [None]:
!conda install -c conda-forge -y streamz ipywidgets

## The Data

For demonstration purposes, we'll use a [publicly available web-log dataset from NASA](http://opensource.indeedeng.io/imhotep/docs/sample-data/).

In [1]:
import os, urllib.request, gzip, io

data_dir = '/data/'

if not os.path.exists(data_dir):
    os.mkdir(data_dir)

url = 'http://indeedeng.github.io/imhotep/files/nasa_19950630.22-19950728.12.tsv.gz'
fn = 'logs_noheader.tsv'

fileStream = io.BytesIO(urllib.request.urlopen(url).read())

# We remove the header line to avoid sending it in some batches and not others
with gzip.open(fileStream, 'rb') as f_in, open(data_dir + fn, 'wb') as fout:
    # This is a latin character set so we must re-encode it
    data = f_in.read().decode('iso-8859-1')
    p_data = data.partition('\n')
    names = p_data[0].split()
    fout.write(p_data[2].encode('utf8'))

## Inspect the Data

The Google SRE HandBook says it's a good idea to track the [4 Golden Signals](https://landing.google.com/sre/sre-book/chapters/monitoring-distributed-systems/#xref_monitoring_golden-signals) for any important system.

In [2]:
import cudf

df = cudf.read_csv(data_dir + fn, sep='\t', names=names, quoting=3)
# The input data has quotation marks which should not be incorretly interpreted as a string with delimiters. Hence quoting is set to 3 (no quoting)
df.head().to_pandas()

Unnamed: 0,host,logname,time,method,url,response,bytes,referer,useragent
0,199.72.81.55,-,804571201,GET,/history/apollo/,200,6245,-1,-1
1,unicomp6.unicomp.net,-,804571206,GET,/shuttle/countdown/,200,3985,-1,-1
2,199.120.110.21,-,804571209,GET,/shuttle/missions/sts-73/mission-sts-73.html,200,4085,-1,-1
3,burger.letters.com,-,804571211,GET,/shuttle/countdown/liftoff.html,304,0,-1,-1
4,199.120.110.21,-,804571211,GET,/shuttle/missions/sts-73/sts-73-patch-small.gif,200,4179,-1,-1


The data above doesn't tell us anything about request latency, but we can aggregate it to get a view into traffic, errors, and saturation.

In [3]:
# calculate total requests served per host system
traffic = df.groupby(['host']).host.count()
traffic[traffic > 5].head().to_pandas()

***.novo.dk                        16
007.thegap.com                     45
01-dynamic-c.wokingham.luna.net    28
02-dynamic-c.wokingham.luna.net    13
03-dynamic-c.wokingham.luna.net    15
Name: host, dtype: int32

In [4]:
# count HTTP error codes per host system
errors = df[df['response'] >= 500].groupby(['host', 'response']).host.count()
errors.to_pandas()

host                     response
129.130.115.19           501          1
134.57.9.77              501          6
163.205.1.45             500         53
163.205.16.23            501          1
192.83.171.94            501          1
cc.newcastle.edu.au      501          1
ix-tf1-18.ix.netcom.com  501          1
n1032036.ksc.nasa.gov    501          1
newcastle03.nbnet.nb.ca  501          2
reddragon.ksc.nasa.gov   500          1
titan02                  500          4
titan02f                 500          4
Name: host, dtype: int32

In [5]:
# measure possible saturation of host network cards
mb_sent = df.groupby(['host']).bytes.sum()/1000000
mb_sent[mb_sent > 100].head().to_pandas()

163.206.137.21        138.230477
163.206.89.4          104.978019
198.133.29.18         104.475133
alyssa.prodigy.com    209.657138
e659229.boeing.com    123.248257
Name: bytes, dtype: float64

You can see from the above that there are not many errors which is great and we can also see hits per host and total MBs sent per host

### Single GPU Streaming with RAPIDS and Streamz

A single GPU can process a lot of data quickly. Thanks to the Streamz API, it's also easy to do it in streaming fashion.

In many streaming systems you return events of interest for ops teams to investigate. That is what we will do.

In [6]:
from io import StringIO

# calculate traffic, errors, and saturation per batch
def process_on_gpu(messages):
    message_stream = StringIO(
            '\n'.join(msg.decode('utf-8') if isinstance(msg,bytes) else msg for msg in messages)
    )
    df = cudf.read_csv(message_stream, sep='\t', names=names, quoting=3)
    traffic = df.groupby(['host']).host.count()
    errors = df[df['response'] >= 500].groupby(['host', 'response']).host.count()
    mb_sent = df.groupby(['host']).bytes.sum()/1000000

    # Return - TSV versions of each metric
    return {'traffic': traffic[traffic > 200].to_string(), 'errors': errors.to_string(), 'mb_sent': mb_sent[mb_sent > 120].to_string()}

In [7]:
import time, datetime

# save each metric type to its own file, instead of dumping lots of output to Jupyter
def save_to_file(events):
    dt = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
    with open(data_dir + 'traffic.txt', 'w+') as fp:
        fp.write(str(dt) + ':' + events['traffic'] + '\n')
    with open(data_dir + 'errors.txt', 'w+') as fp:
        fp.write(str(dt) + ':' + events['errors'] + '\n')
    with open(data_dir + 'mb_sent.txt', 'w+') as fp:
        fp.write(str(dt) + ':' + events['mb_sent'] + '\n')
    print(str(dt) + ': metrics batch written..')


Note that the function above opens the file in overwrite mode for simplicity (which means the data in the file would correspond to the last batch only). Typically a workflow would append to the existing file, write to a seperate file with a timestamp based filename or produce to another kafka topic. 

In [8]:
from streamz import Stream

# setup the stream
# Streamz allows streaming directly from a text file
source = Stream.from_textfile(data_dir + fn)
# process 250k lines per batch
out = source.partition(250000).map(process_on_gpu).sink(save_to_file)

In [9]:
source.start()

2019-07-29 18:14:30: metrics batch written..
2019-07-29 18:14:33: metrics batch written..
2019-07-29 18:14:37: metrics batch written..
2019-07-29 18:14:40: metrics batch written..
2019-07-29 18:14:43: metrics batch written..
2019-07-29 18:14:47: metrics batch written..
2019-07-29 18:14:50: metrics batch written..


In [10]:
!echo "Error Log:"
!head -n5 {data_dir}errors.txt
!echo "\nTraffic Log:"
!head -n5 {data_dir}traffic.txt
!echo "\nMB Sent Log:"
!head -n5 {data_dir}mb_sent.txt

Error Log:
2019-07-29 18:14:50:('134.57.9.77', 501)    5
Name: host, dtype: int32

Traffic Log:
2019-07-29 18:14:50:128.159.105.240     212
163.205.1.45     426
163.205.156.16     308
163.205.180.17     317
163.206.137.21     308

MB Sent Log:
2019-07-29 18:14:50:<empty Series of dtype=float64>


### Scaling Streamz to multiple GPUs with Dask & Kafka

As opposed to streaming from files a very common pattern is to read from distributed log systems like Apache Kafka.

The below example assumes you have a running Kafka instance/cluster.

For help setting up your own, follow the [Kafka Quickstart guide](http://kafka.apache.org/quickstart).

In [11]:
from dask_cuda import LocalCUDACluster
from dask.distributed import Client

# create a Dask cluster with 1 worker per GPU
cluster = LocalCUDACluster()
client = Client(cluster)

Streamz uses the [python-confluent-kafka](https://github.com/confluentinc/confluent-kafka-python) library for handling interactions with kafka. Make sure this library installed

In [None]:
!conda install -c conda-forge -y python-confluent-kafka 

In [12]:
from streamz import Stream
import confluent_kafka

# Kafka specific configurations
topic = "haproxy-topic"
bootstrap_servers = 'localhost:9092'
consumer_conf = {'bootstrap.servers': bootstrap_servers, 'group.id': 'custreamz', 'session.timeout.ms': 60000}

stream = Stream.from_kafka_batched(topic, consumer_conf, poll_interval='1s', npartitions=1, asynchronous=True, dask=False)
final_output = stream.map(process_on_gpu).sink(save_to_file)
stream.start()

Typically, upstream applications produce messages to Kafka. For the sake of a self contained example you can experiment with, we'll use the Confluent Kafka library to produce messages that our Streamz app immediately consumes.

In [13]:
producer = confluent_kafka.Producer({'bootstrap.servers': bootstrap_servers})

with open(data_dir+fn, 'rb') as fp:
    for line in fp.readlines():
        try:
            producer.produce(topic, line)
        except BufferError:
            # Wait for the specified timeout as the queue is full
            producer.flush(0.2)

producer.flush()
print("Producer queue is now empty!")

2019-07-29 18:15:11: metrics batch written..
Producer queue is now empty!
2019-07-29 18:15:17: metrics batch written..
2019-07-29 18:15:32: metrics batch written..


In [14]:
!echo "Error Log:"
!head -n5 {data_dir}errors.txt
!echo "\nTraffic Log:"
!head -n5 {data_dir}traffic.txt
!echo "\nMB Sent Log:"
!head -n5 {data_dir}mb_sent.txt

Error Log:
2019-07-29 18:15:32:('129.130.115.19', 501)    1
('134.57.9.77', 501)    5
('163.205.16.23', 501)    1
('192.83.171.94', 501)    1
('cc.newcastle.edu.au', 501)    1

Traffic Log:
2019-07-29 18:15:32:128.158.26.178      235
128.158.42.150      281
128.158.54.14      206
128.159.105.240      589
128.159.111.141      376

MB Sent Log:
2019-07-29 18:15:32:piweba1y.prodigy.com    148.073398
piweba3y.prodigy.com     229.52305
piweba4y.prodigy.com    213.940041
Name: bytes, dtype: float64
