## Initialization

In [3]:
from notebook_client.notebook_client import NotebookClient
nc = NotebookClient()
nc.initialize_connections()

## Populate Kafka topic with some data

In [2]:
from kafka import KafkaProducer
from lib.serializer import value_serializer

topic_name = 'my_test_topic'

def populate_test_topic():
    producer = KafkaProducer(bootstrap_servers='kafka:9092', value_serializer=value_serializer)
    for i in range(100):
        obj = {'x': i}
        producer.send(topic_name, obj)
    producer.flush()
    producer.close()
    print('done')
    
populate_test_topic()

done


## Create keyspace and table in Cassandra

In [2]:
from cassandra.cluster import Cluster

cluster = Cluster(['cassandra'])
session = cluster.connect()
session.execute("CREATE KEYSPACE my_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };")
session.set_keyspace('my_keyspace')
session.execute("CREATE TABLE my_table(id text PRIMARY KEY, x int)")
cluster.shutdown()

## Write Spark job to file

In [1]:
%%writefile jobs/test_job.py

import json
import uuid

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition

from pyspark_cassandra import streaming


ENCODING = 'utf-8'


def value_deserializer(m):
    return json.loads(m.decode(ENCODING))


def add_id(i):
    i['id'] = str(uuid.uuid4())
    return i


def reducer(a, b):
    print(a, b)
    a['x'] += b['x']
    return a


sc = SparkContext(appName='testApp')
ssc = StreamingContext(sc, 10)

topicAndPartition = TopicAndPartition('my_test_topic', 0) # topic: my_test_topic, partition: 0
fromOffsets = {topicAndPartition: 0} # start from offset: 0

kvs = KafkaUtils.createDirectStream(ssc, ['my_test_topic'], {"metadata.broker.list": 'kafka:9092'}, valueDecoder=value_deserializer, fromOffsets=fromOffsets)
kvs\
    .map(lambda i: i[1])\
    .reduce(reducer)\
    .map(add_id)\
    .saveToCassandra('my_keyspace', 'my_table')

ssc.start()
ssc.awaitTermination()

Overwriting jobs/test_job.py


## Execute job

In [4]:
nc.start_job('jobs/test_job.py')

715

In [11]:
nc.job_status(715)

'not found'

## Wait for results to appear in Cassandra

In [6]:
from cassandra.cluster import Cluster

cluster = Cluster(['cassandra'])
session = cluster.connect('my_keyspace')
results = session.execute("SELECT * FROM my_table;")
cluster.shutdown()

for i in results:
    print(i)

Row(id='652200eb-39a3-4f45-b2dc-1b4459219f91', x=4950)


In [10]:
nc.stop_job(715)

'stopped'