In [1]:
// Requiring all the Imports
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window._
import org.apache.spark.sql.types._



In [2]:
// creating a spark Session
val spark=SparkSession.builder().master("local[2]").appName("Streaming Word Count")
.config("spark.streaming.stopgracefullyOnShutdown",true)
.config("spark.sql.shuffle.partitions",3)
.config("spark.sql.streaming.schemaInterface",true)
.getOrCreate()

spark = org.apache.spark.sql.SparkSession@3886c5ee


org.apache.spark.sql.SparkSession@3886c5ee

In [3]:
// reading the data from socket
val transaction_df=spark.readStream.format("socket")
.option("host","localhost")
.option("port","1354")
.load()

transaction_df = [value: string]


[value: string]

In [5]:
// derfing the schema of  order_table
val transaction_schema=StructType(List(
StructField("card_id",LongType),
StructField("amount",LongType),
StructField("postcode",LongType),
StructField("pos_id",LongType),
StructField("transection_date",TimestampType)
))

transaction_schema = StructType(StructField(card_id,LongType,true), StructField(amount,LongType,true), StructField(postcode,LongType,true), StructField(pos_id,LongType,true), StructField(transection_date,TimestampType,true))


StructType(StructField(card_id,LongType,true), StructField(amount,LongType,true), StructField(postcode,LongType,true), StructField(pos_id,LongType,true), StructField(transection_date,TimestampType,true))

In [6]:
// giviing schema to the order_table
val transofnew=transaction_df.select(from_json(col("value"),transaction_schema).alias("value"))
// giving the schema to the table as it is under the table
val refined_df=transofnew.select("value.*")
refined_df.printSchema()


root
 |-- card_id: long (nullable = true)
 |-- amount: long (nullable = true)
 |-- postcode: long (nullable = true)
 |-- pos_id: long (nullable = true)
 |-- transection_date: timestamp (nullable = true)



transofnew = [value: struct<card_id: bigint, amount: bigint ... 3 more fields>]
refined_df = [card_id: bigint, amount: bigint ... 3 more fields]


[card_id: bigint, amount: bigint ... 3 more fields]

In [7]:
//loading the local file of the data to the HDFS using the command
//hadoop fs  -put "/home/itv003334/transection_id.csv" "trans";
val static_df=spark.read.format("csv").option("header",true).option("inferSchema",true).option("path","trans").load()
static_df.printSchema()

root
 |-- card_id: long (nullable = true)
 |-- member_id: long (nullable = true)
 |-- card_issue_date: timestamp (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)



static_df = [card_id: bigint, member_id: bigint ... 3 more fields]


[card_id: bigint, member_id: bigint ... 3 more fields]

In [8]:
// defining the join condition
val joinCondition=refined_df.col("card_id")===static_df.col("card_id")
val joinType="inner"

joinCondition = (card_id = card_id)
joinType = inner


inner

In [12]:
// defining the join condition
val joined_data=static_df.join(refined_df,joinCondition,joinType)
joined_data.printSchema()

root
 |-- card_id: long (nullable = true)
 |-- member_id: long (nullable = true)
 |-- card_issue_date: timestamp (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)
 |-- card_id: long (nullable = true)
 |-- amount: long (nullable = true)
 |-- postcode: long (nullable = true)
 |-- pos_id: long (nullable = true)
 |-- transection_date: timestamp (nullable = true)



joined_data = [card_id: bigint, member_id: bigint ... 8 more fields]


[card_id: bigint, member_id: bigint ... 8 more fields]

In [None]:
// collecting the ouput of the stream on the terminal
val outputdf=joined_data.writeStream.format("console").outputMode("update")
.option("checkpointLocation","order_1")
.trigger(Trigger.ProcessingTime("15 Seconds"))
.start()
outputdf.awaitTermination()

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+---------+---------------+-------+-----+-------+------+--------+------+----------------+
|card_id|member_id|card_issue_date|country|state|card_id|amount|postcode|pos_id|transection_date|
+-------+---------+---------------+-------+-----+-------+------+--------+------+----------------+
+-------+---------+---------------+-------+-----+-------+------+--------+------+----------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+----------------+---------------+-------------------+-------------+---------+----------------+------+--------+---------------+----------------+
|         card_id|      member_id|    card_issue_date|      country|    state|         card_id|amount|postcode|         pos_id|transection_date|
+----------------+---------------+-------------------+-------------+---------+----------------+------+--------+------------

In [None]:
{"order_id":5852,"order_date":"2020-03-02 11:48:00","order_customer_id":9344,"order_status":"COMPLETE", "amount": 400}

{"order_id":5852,"order_date":"2020-03-02 11:14:00","order_customer_id":9344,"order_status":"COMPLETE", "amount": 400}

{"order_id":5852,"order_date":"2020-03-02 11:16:00","order_customer_id":9344,"order_status":"COMPLETE", "amount": 600}