In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('structured_streaming').getOrCreate()

In [3]:
import pyspark.sql.functions as F

In [4]:
from pyspark.sql.types import *

In [11]:
df_1 = spark.createDataFrame([("XN203",'FB',300,30),("XN201",'Twitter',10,19),("XN202",'Insta',500,45)], 
                           ["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')

In [10]:
schema = StructType().add("user_id","string"). add("app","string").add("time_in_secs", "integer").add("age", "integer")

In [15]:
data = spark.readStream.option("sep", ",").schema(schema).csv("csv_folder")

In [16]:
data.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- app: string (nullable = true)
 |-- time_in_secs: integer (nullable = true)
 |-- age: integer (nullable = true)



In [17]:
app_count=data.groupBy('app').count()

In [21]:
query=(app_count.writeStream.queryName('count_query').outputMode('complete').format('memory').start())

In [20]:
spark.sql("select * from count_query ").toPandas().head(5)

Unnamed: 0,app,count
0,Insta,2
1,FB,2
2,Twitter,2


In [22]:
#Join operation 

app_df=spark.createDataFrame([('FB','FACEBOOK'),('Insta', 'INSTAGRAM'),('Twitter','TWITTER')],["app", "full_name"])
app_df.show()

+-------+---------+
|    app|full_name|
+-------+---------+
|     FB| FACEBOOK|
|  Insta|INSTAGRAM|
|Twitter|  TWITTER|
+-------+---------+



In [23]:
app_stream_df=data.join(app_df,'app')

In [24]:
join_query=(app_stream_df.writeStream.queryName('join_query').outputMode('append').format('memory').start())

In [25]:
spark.sql("select * from join_query ").toPandas().head(50)

Unnamed: 0,app,user_id,time_in_secs,age,full_name
0,FB,XN203,300,30,FACEBOOK
1,FB,XN203,300,30,FACEBOOK
2,Insta,XN202,500,45,INSTAGRAM
3,Insta,XN202,500,45,INSTAGRAM
4,Twitter,XN201,10,19,TWITTER
5,Twitter,XN201,10,19,TWITTER
