In [39]:
import time
from pyspark.sql.types import StructType,StringType,IntegerType
from pyspark.sql.functions import from_json,col

In [40]:
# ----Creating necessary variables----
kafka_topic="your_kafka_topic"
kafka_bootstrap_server="localhost:9092"
customer_data_path="your_hdfs_path"

mysql_host="localhost"
mysql_port="3306"
mysql_table="your_table_name"
mysql_url="jdbc:mysql://localhost:3306/your_database"
mysql_driver="com.mysql.jdbc.Driver"
mysql_user="your_username"
mysql_pwd="your_password"

cassandra_keyspace="your_cassandra_keyspace"
cassandra_table="your_cassandra_table"

In [41]:
#----function for inserting aggregated data into mysql----
def write_tomysql(df,epoch_id):
    df.write \
    .format('jdbc') \
    .options(url=mysql_url,
            driver=mysql_driver,
            dbtable=mysql_table,
            user=mysql_user,
            password=mysql_pwd,
            ) \
    .mode('append') \
    .save()

In [42]:
sc.setLogLevel('ERROR')

In [43]:
#----Reading date from kafka topic----
orders_df=spark.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers',kafka_bootstrap_server)  \
.option('subscribe',kafka_topic) \
.load()

In [44]:
orders_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [46]:
orders_df1=orders_df.selectExpr("CAST(value AS STRING)")

In [59]:
#---creating schema to insert into table----
orders_schema=StructType() \
.add("order_id",StringType()) \
.add("created_at",StringType()) \
.add("customer_id",StringType()) \
.add("discount",StringType()) \
.add("product_id",StringType()) \
.add("quantity",StringType()) \
.add("subtotal",StringType()) \
.add("tax",StringType()) \
.add("total",StringType())

In [60]:
#-----Applying schema to data----
orders_df2=orders_df1 \
.select(from_json(col('value'),orders_schema).alias('order'))

In [61]:
orders_df3=orders_df2 \
.select("order.*")

In [62]:
orders_df3.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- discount: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- subtotal: string (nullable = true)
 |-- tax: string (nullable = true)
 |-- total: string (nullable = true)



In [63]:
#-----Getting customer Data from HDFS
customer_df=spark.read.csv(customer_data_path,header=True,inferSchema=True)

In [64]:
for i in customer_df.columns:
    customer_df=customer_df.withColumnRenamed(i,i.lower())

In [65]:
customer_df=customer_df.withColumnRenamed('id','customer_id')

In [66]:
customer_df.show(5,False)

+-----------+-------------------+----------------------------+----------+-----------+-----------------------+-----------------------------+-----------------+------------+------------------------------------+---------+-----+-----+
|customer_id|name               |address                     |birth date|city       |created at             |email                        |latitude         |longitude   |password                            |source   |state|zip  |
+-----------+-------------------+----------------------------+----------+-----------+-----------------------+-----------------------------+-----------------+------------+------------------------------------+---------+-----+-----+
|1          |Hudson Borer       |9611-9809 West Rosedale Road|1986-12-12|Wood River |2017-10-07 01:34:35.462|borer-hudson@yahoo.com       |40.71314890000001|-98.5259864 |ccca881f-3e4b-4e5c-8336-354103604af6|Twitter  |NE   |68883|
|2          |Domenica Williamson|101 4th Street              |1967-06-10|Searsbo

In [67]:
#----Joining Orders and Customers table by customer_id----- 
orders_df4=orders_df3.join(customer_df,orders_df3.customer_id==customer_df.customer_id,how='inner')
orders_df4.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- discount: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- subtotal: string (nullable = true)
 |-- tax: string (nullable = true)
 |-- total: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- birth date: string (nullable = true)
 |-- city: string (nullable = true)
 |-- created at: timestamp (nullable = true)
 |-- email: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- password: string (nullable = true)
 |-- source: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)



In [68]:
#-----Simple Aggregation--------
orders_df5=orders_df4.groupBy('source','state') \
.agg({'total':'sum'}).select('source','state',col('sum(total)').alias('total_sum_amount'))

In [69]:
orders_df5.printSchema()

root
 |-- source: string (nullable = true)
 |-- state: string (nullable = true)
 |-- total_sum_amount: double (nullable = true)



In [70]:
#------Writing data in Cassandra----
cassandra_stream=orders_df3.writeStream \
.format('org.apache.spark.sql.cassandra')  \
.trigger(processingTime='5 seconds') \
.outputMode("append") \
.options(table=cassandra_table,keyspace=cassandra_keyspace) \
.option('checkpointLocation','file:///home/sanjay/Spark/checkpoint1') \
.start() \
.awaitTermination()

KeyboardInterrupt: 

In [71]:
#------Writing aggrgated data in MySQL for Analysis-------
mysql_stream=orders_df5.writeStream \
.trigger(processingTime='10 seconds') \
.outputMode('update') \
.foreachBatch(write_tomysql) \
.start()\
.awaitTermination()

KeyboardInterrupt: 