# Batch processing with Spark and Cassandra

## Import Spark Libraries

In [1]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark import StorageLevel
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *

## Connect to Cassandra and HDFS

In [2]:
from cassandra.cluster import Cluster
cluster = Cluster()

ec2_host = "ec2-52-35-74-206.us-west-2.compute.amazonaws.com:9000/"
hdfs_dir = "/camus/topics/testing_hf/hourly/2016/01/29/00"

conf = SparkConf().setAppName("Smart Meter Watchdog")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
df = sqlContext.read.json("hdfs://" + ec2_host + hdfs_dir)

## Take some samples to make sure it works!

In [6]:
import time
time.strftime("%D", time.localtime(int("1306006763")))

'05/21/11'

In [7]:
import time

def ts2date(curTime):
    return time.strftime("%D", time.localtime(int(curTime)))

ts2date('1306006763')

'05/21/11'

In [11]:
print df.take(1)

[Row(houseId=1, readings=[Row(label=u'mains', meterId=u'1', power=u'179.75'), Row(label=u'mains', meterId=u'2', power=u'650.80')], timestamp=u'1303516800', zip=u'21771')]


In [25]:
data_rdd = df.map(lambda row: ((row.houseId, row.timestamp, row.zip), row.readings))

In [22]:
data_rdd.take(1)

[((1, u'1303516800', u'21771'),
  [Row(label=u'mains', meterId=u'1', power=u'179.75'),
   Row(label=u'mains', meterId=u'2', power=u'650.80')])]

In [9]:
df_reorder = df.map(lambda p: (p.houseId, ts2date(p.timestamp), p.zip, p.readings[0].label, p.readings[0].meterId, float(p.readings[0].power) + float(p.readings[1].power)))

In [10]:
df_reorder.take(1)

[(1, '04/23/11', u'21771', u'mains', u'1', 830.55)]

In [5]:
df.printSchema()

root
 |-- houseId: long (nullable = true)
 |-- readings: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- label: string (nullable = true)
 |    |    |-- meterId: string (nullable = true)
 |    |    |-- power: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- zip: string (nullable = true)



### Add one more column and convert timestamp to date

In [8]:
df_date = SQLContext.createDataFrame(sqlContext, df_reorder.map(lambda row: Row(**dict(row.asDict(), date=ts2date(row.timestamp)))))

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/pyspark/rdd.py", line 1295, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-8-76c7fe9243c7>", line 1, in <lambda>
AttributeError: 'tuple' object has no attribute 'asDict'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/pyspark/rdd.py", line 1295, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-8-76c7fe9243c7>", line 1, in <lambda>
AttributeError: 'tuple' object has no attribute 'asDict'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	... 1 more


In [None]:
df_date.take(1)

In [None]:
print type(df_date)
print type(df)

In [None]:
df_house_power = df_date.select(df_date['houseId'], df_date['date'], df_date['zip'], df_date['power'])

In [None]:
df_house_power.take(1)

In [None]:
df_house_power_aggr = df_house_power.map(lambda x: ((x.houseId, x.date, x.zip), x.power)).reduceByKey(lambda x, y: float(x)+float(y))

In [26]:
df_house_power_aggr = df_reorder.map(lambda x: ((x[0], x[1], x[2]), x[5])).reduceByKey(lambda x, y: float(x) + float(y))

In [27]:
df_house_power_aggr.take(1)

[((3, '05/05/11', u'97759'), 15857599.789999912)]

In [None]:
df_house_power_aggr.take(10)

In [29]:
df_house_clean = df_house_power_aggr.map(lambda x: {
        "houseId": x[0][0],
        "date": x[0][1],
        "zip": x[0][2],
        "power": x[1]
    })

In [None]:
type(df_house_clean)

In [30]:
df_house_clean.count()

157

In [33]:
def aggToCassandraPart(agg):
    if agg:
        cascluster = Cluster(['52.89.47.199', '52.89.59.188', '52.88.228.95', '52.35.74.206'])
        casSession = cascluster.connect('playground')
        for rec in agg:
            casSession.execute('INSERT INTO power_aggr2 (houseId, zip, date, power) VALUES (%s, %s, %s, %s)', (str(rec['houseId']), rec['zip'], rec['date'], str(rec['power'])))
        casSession.shutdown()
        cascluster.shutdown()

In [34]:
df_house_clean.foreachPartition(aggToCassandraPart)

## Simple operation on the Cassandra database to make sure it works :-)

In [None]:
session = cluster.connect('playground')

result = session.execute("select * from email")
for x in result: print x