**This is a cuStreamz job for the classic example of Streaming Word Count.**

For this example, we will be demonstrating how to stream from Kafka. But one can also perform a streaming word count reading from a text file which is being continuously written into.

You can refer to https://kafka.apache.org/quickstart to start a local Kafka cluster.

In [1]:
#Streamz and cudf imports
import cudf
from streamz import Stream
from streamz.dataframe import DataFrame
import numpy as np

Let's assume that the data coming in to the Kafka topic — i.e., each record/message, is a line in the form of "this is line x", where x is an incremental counter. 
    
Now, we write a function to parse each such message to get the list of words in each line. 

One can also make use of nvstrings (now custrings, the GPU-accelerated string manipulation library) to tokenise all the messages in the batch. Refer to process_batch_1().

In [2]:
#Helper function operating on every batch polled from Kafka for word count
def process_batch(messages):
    y = []
    for x in messages:
        y = y + x.decode('utf-8').strip('\n').split(" ")
    return y

import nvstrings, nvtext
def process_batch_1(messages):
    messages_1 = []
    for message in messages:
        messages_1.append(message.decode('utf-8'))
    device_lines = nvstrings.to_device(messages_1)
    words = nvtext.tokenize(device_lines)
    return words

Let's create a Kafka consumer. 

In [3]:
#Kafka topic to read streaming data from
topic = "word-count"

#Kafka brokers
bootstrap_servers = 'localhost:9092'

#Kafka consumer configuration
consumer_conf = {'bootstrap.servers': bootstrap_servers, 'group.id': 'custreamz', 'session.timeout.ms': 60000}

We now use Streamz to create a Stream from Kafka by polling the topic every 10s. 

In [4]:
#If you changed Dask=True, please ensure you have a Dask cluster up and running
source = Stream.from_kafka_batched(topic, consumer_conf, npartitions=1, poll_interval='10s', asynchronous=True, dask=False)

In [5]:
#Applying process_batch function to process word count on each batch
stream = source.map(process_batch)

*Streamz DataFrame does the trick!* 

After we get the parsed word list on our stream from Kafka, we just perform simple aggregations using the Streamz DataFrame to get the word count.

We then write the output (word count) to a list.

In [6]:
stream_df = stream.map(lambda words: cudf.DataFrame({'word': words, 'count': np.ones(len(words),dtype=np.int32)}))
sdf = DataFrame(stream_df, example=cudf.DataFrame({'word':[], 'count':[]}))
output = sdf.groupby('word').sum().stream.buffer(100000).gather().sink_to_list()

Starting the stream!

In [7]:
source.start()

Let's see what output we have:

In [8]:
output

[<cudf.DataFrame ncols=1 nrows=70 >]

We can see that we have cuDF dataframe that got produced to the output. Let's see if we can print some actual word counts. 

In [9]:
#Printing the values
print(output[0].loc[65:])

   count
65      1
66      1
7      1
8      1
9      1
is     67
line     67
this     67


We can! :) 

Now, let's wait for some more time before checking the output again. 

If we're sure of what's happening, the output should now have a list of cuDF dataframes, each having the cumulative streaming word count of all the data seen until now, the last cuDF dataframe being the most recent.

In [10]:
#Print the output again
output

[<cudf.DataFrame ncols=1 nrows=70 >,
 <cudf.DataFrame ncols=1 nrows=185 >,
 <cudf.DataFrame ncols=1 nrows=292 >,
 <cudf.DataFrame ncols=1 nrows=400 >,
 <cudf.DataFrame ncols=1 nrows=507 >]

In [11]:
#Printing the values again
print(output[4].loc[99:])

   count
99      1
is    504
line    504
this    504
