Receive and processe a stream of user clicks
---

In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.1, pyspark-shell'
import time
import io

# pyspark and kafka
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# avro
import avro
import avro.schema
from avro.io import DatumWriter, DatumReader
from avro.datafile import DataFileReader

Read schema and create the avro reader

```json
{"namespace": "trivago.avro",
 "type": "record",
 "name": "ClickLog",
 "fields": [
     {"name": "user_id", "type": ["int", "null"]},
     {"name": "time", "type": ["int", "null"]},
     {"name": "action", "type": ["int", "null"]},
     {"name": "destination", "type": ["int", "null"]},
     {"name": "hotel", "type": ["int", "null"]}
 ]
}
```

* actions can be: 
   * 1 = search
   * 2 = filter
   * 3 = click

In [None]:
ACTION_SEARCH = 1
ACTION_FILTER = 2
ACTION_CLICK = 3

checkpointDirectory = "data/ssc_checkpoint"
batchDuration = 5

In [None]:
# Read schema
schema_path = "data/click_log.avsc"
schema = avro.schema.parse(open(schema_path).read())
reader = avro.io.DatumReader(schema)

def decoder(item):
    # read decoded binary
    decoded = reader.read(avro.io.BinaryDecoder(io.BytesIO(item)))
    
    # return a rdd Row
    return Row(user_id=decoded['user_id'],
               time=decoded['time'],
               action=decoded['action'],
               destination=decoded['destination'],
               hotel=decoded['hotel'])

Connect to the _clicklog_ kafka topic

Create a streaming context. Check if there is a checkpoint from a previous failure

In [None]:
def createStreamingContext():
    conf = SparkConf().setAppName("Streaming Clicks") \
                      .setMaster("local[2]")
    sc = SparkContext(conf=conf) 
    ssc = StreamingContext(sc, batchDuration)
    ssc.checkpoint(checkpointDirectory)   # set checkpoint directory
    return ssc

In [None]:
# ssc = StreamingContext.getOrCreate(checkpointDirectory, createStreamingContext)
ssc = createStreamingContext()
kvs = KafkaUtils.createStream(ssc, 
                              "127.0.0.1:2181", 
                              "spark-streaming-consumer", 
                              {'clicklog': 1},
                              valueDecoder=decoder)

In [None]:
def print_top10(rdd):
    print rdd.take(10)

In [None]:
click_rdd = kvs.map(lambda x: x[1])
click_rdd.filter(lambda x: x.action == ACTION_SEARCH) \
         .map(lambda x: (x.destination, 1)) \
         .reduceByKeyAndWindow(lambda x,y: x + y, 
                               lambda x,y: x - y,
                               20, 5) \
         .map(lambda (a, b): (b, a)) \
         .transform(lambda rdd: rdd.sortByKey(ascending=False)) \
         .map(lambda (b, a): {"destination": a, "search_count": b}) \
         .foreachRDD(print_top10)

In [None]:
ssc.start()