In [None]:
from pyspark import SparkContext, SparkConf, RDD
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pykafka import KafkaClient
import json

# Create a local StreamingContext with two working thread and batch interval of 1 second
#sc = SparkContext("spark://server1:7077", "pyspark")


conf = SparkConf().setAppName("Sales Order RAW Data").setMaster("spark://server1:7077")
conf.set("spark.cores.max", 3)
conf.set("spark.cassandra.connection.host", "server4")
#conf.set("spark.executor.extraClassPath", "/root/spark-cassandra-connector-assembly-1.5.0-M1-SNAPSHOT.jar")
#conf.set("spark.executor.extraClassPath", "/root/spark-streaming-kafka-assembly_2.10-1.5.0.jar")
#conf.set("spark.executor.extraClassPath", "/root/kafka_2.10-0.8.2.1/libs/kafka_2.10-0.8.2.1.jar")
#conf.set("spark.executor.extraClassPath", "/root/kafka_2.10-0.8.2.1/libs/zkclient-0.3.jar")


sc = SparkContext(conf=conf)

streamingContext = StreamingContext(sc, 30)

In [None]:
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement

class SimpleClient(object):
    session = None
    createProfileStatement = None
    updateSalesOrderStatement = None
    
    batch = None

    def connect(self, nodes):
        cluster = Cluster(nodes)
        metadata = cluster.metadata
        self.session = cluster.connect()
        #the key space is test now
        self.session.execute("use test")
        self.createProfileStatement = self.session.prepare("insert into customer_product(customer, product, date) values(?,?,?) if not exists")
        self.updateSalesOrderStatement = self.session.prepare("update customer_product set orders[?] = {qty: ?, unitAmt: ?} where customer = ? and product = ? and date = ?")

    def close(self):
        self.session.cluster.shutdown()
        
    def createProfile(self, args):
        #self.batch.add(self.createProfileStatement, args)
        self.session.execute(self.createProfileStatement, args)
    
    def updateSalesOrder(self, args):
        #self.batch.add(self.updateSalesOrderStatement, args)
        self.session.execute(self.updateSalesOrderStatement, args)
    
    def execute(self, statement, parameters=[]):
        preparedStatement = self.session.prepare(statement)
        return self.session.execute(preparedStatement, parameters)

In [None]:
#change localhost:2181 to server7 or zookeeper on server1?
stream = KafkaUtils.createStream(streamingContext, 'server1.bigdata.ibm.com:2181', "raw-data-to-cassandra", {'event' :1})

def processEvent(client, producer, event):
    #print event
    customer = event['Customer']
    product = event['Product']
    date = event['XactionDate'][0:10]
    time = event['XactionDate'][11:19]
    qty = event['Qty']
    unitAmt = event['UnitAmt']
    #insert row if not exist
    #print "inserting profile"
    #client.createProfile([customer, product, date])
    client.execute("insert into customer_product(customer, product, date) values(?,?,?) if not exists", [customer, product, date])
    #insert sales order
    #print "inerting sales order"
    client.execute("update customer_product set orders[?] = {qty: ?, unitAmt: ?} where customer = ? and product = ? and date = ?",
                  [time, qty, unitAmt, customer, product, date])
    #client.updateSalesOrder([time, qty, unitAmt, customer, product, date])
    #send to profile update service
    profile = {'customer': customer, 'product': product, 'date': date}
    producer.produce([json.dumps(profile)])
    #print "done"
    return event

def processPartition(iter):
    try:
        #cassandra connection
        client = SimpleClient()
        client.connect(['server4.bigdata.ibm.com','server5.bigdata.ibm.com','server6.bigdata.ibm.com'])
        #print 'connected'
        #kafka connection
        kafkaClient = KafkaClient(hosts="server7.bigdata.ibm.com:9092,server8.bigdata.ibm.com:9092,server9.bigdata.ibm.com:9092")
        topic = kafkaClient.topics['profile']
        producer = topic.get_producer()
        
        i = 0
        for record in iter:
            event = processEvent(client, producer, json.loads(record[1]))
            i += 1
    finally:
        client.close()
        print '######## persist ' + str(i) + ' events ,last event is ' +  str(event)

def processRDD(rdd):
    rdd.foreachPartition(processPartition)
    
stream.foreachRDD(processRDD)

In [None]:
streamingContext.start()             # Start the computation
streamingContext.awaitTermination()  # Wait for the computation to terminate