# Persisting Layer for Batch Job

** Importing Requisites **

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SQLContext

import time

from cassandra.cluster import Cluster
from cassandra.query import BatchStatement

In [None]:
# Creating Spark Session and SQL Context
spark = SparkSession.builder.appName('lr_example').getOrCreate()
sqlContext = SQLContext(spark)

** Reading Data from HDFS **

In [None]:
data = spark.read.csv("hdfs://localhost:9000/kafka_spark", inferSchema=True, header=True)

In [None]:
data.printSchema()

In [None]:
data.columns

In [None]:
data.show()

In [None]:
data.show(7, truncate=False)

In [None]:
def timestamp_conversion(timestamp):
        if timestamp is not None:
            return time.strftime('%Y-%m-%d', time.gmtime(timestamp/1000))
        else:
            return "null null"

In [None]:
time_udf = udf(timestamp_conversion,StringType())

In [None]:
inputDF = data.withColumn("timestamp_hour",time_udf(data["timestamp_hour"]))
inputDF.createOrReplaceTempView("activity")

** Batch Analytics **

In [None]:
visitorsByProduct = sqlContext.sql("SELECT product, timestamp_hour, COUNT(DISTINCT visitor) as unique_visitors FROM Activity GROUP BY product, timestamp_hour")

In [None]:
visitorsByProduct.show(5)

In [None]:
activityByProduct = sqlContext.sql("SELECT product, timestamp_hour, sum(case when action = 'purchase' then 1 else 0 end) as purchase_count, sum(case when action = 'add_to_cart' then 1 else 0 end) as add_to_cart_count, sum(case when action = 'page_view' then 1 else 0 end) as page_view_count from Activity group by product, timestamp_hour").cache()

In [None]:
activityByProduct.show(5)

** Creating Cassandra cluster object and connecting to default cluster **

In [None]:
cluster = Cluster()
session = cluster.connect()

# Setting keyspace so that all queries by default refer to this keyspace
session.set_keyspace('lambda')

** inserting into batch_visitors_by_product table in cassandra **

In [None]:
query = session.prepare("INSERT INTO batch_visitors_by_product (product, timestamp_hour, unique_visitors) VALUES (?,?,?)")
for row in visitorsByProduct.rdd.collect():
    session.execute(query, [row['product'], row['timestamp_hour'], row['unique_visitors']])

** inserting into batch_activity_by_product ** 

In [None]:
query = session.prepare("INSERT INTO batch_activity_by_product (product, timestamp_hour, add_to_cart_count, page_view_count, purchase_count) VALUES (?, ?, ?, ?, ?)")
for row in activityByProduct.rdd.collect():
    session.execute(query, [row['product'], row['timestamp_hour'], row['purchase_count'], row['add_to_cart_count'],row['page_view_count']])

** D O N E ! . . . . . **