In [1]:
#import SparkSession
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('structured_streaming').getOrCreate()

In [2]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

In [3]:
#create sample dataset
df_1=spark.createDataFrame([("XN203",'FB',300,30),("XN201",'Twitter',10,19),("XN202",'Insta',500,45)], 
                           ["user_id", "app" ,"time_in_secs","age"]).write.csv("demo",mode='append')

In [4]:
#define schema for input data
schema=StructType().add("user_id", "string").add("app", "string").add("time_in_secs", "integer").add("age", "integer")
data=spark.readStream.option("sep", ",").schema(schema).csv("demo")

In [5]:
data.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- app: string (nullable = true)
 |-- time_in_secs: integer (nullable = true)
 |-- age: integer (nullable = true)



In [6]:
app_count=data.groupBy('app').count()

In [7]:
query=(app_count.writeStream.queryName('count_query').outputMode('complete').format('memory').start())

In [8]:
spark.sql("select * from count_query ").toPandas().head(5)

Unnamed: 0,app,count
0,Insta,1
1,FB,1
2,Twitter,1


In [9]:
fb_data=data.filter(data['app']=='FB')

In [10]:
fb_avg_time=fb_data.groupBy('user_id').agg(F.avg("time_in_secs"))

In [11]:
fb_query=(fb_avg_time.writeStream.queryName('fb_query').outputMode('complete').format('memory').start())

In [13]:
spark.sql("select * from fb_query ").toPandas().head(5)

Unnamed: 0,user_id,avg(time_in_secs)
0,XN203,300.0


In [21]:
df_2=spark.createDataFrame([("XN203",'FB',100,30),("XN201",'FB',10,19),("XN202",'FB',2000,45)], 
                           ["user_id", "app" ,"time_in_secs","age"]).write.csv("demo",mode='append')

In [23]:
spark.sql("select * from fb_query ").toPandas().head(5)

Unnamed: 0,user_id,avg(time_in_secs)
0,XN203,200.0
1,XN201,10.0


In [24]:
df_3=spark.createDataFrame([("XN203",'FB',500,30),("XN201",'Insta',30,19),("XN202",'Twitter',100,45)], 
                           ["user_id", "app" ,"time_in_secs","age"]).write.csv("demo",mode='append')

In [25]:
spark.sql("select * from fb_query ").toPandas().head(5)

Unnamed: 0,user_id,avg(time_in_secs)
0,XN203,200.0
1,XN201,10.0
2,XN202,2000.0


In [26]:
df_4=spark.createDataFrame([("XN203",'FB',500,30),("XN201",'Insta',30,19),("XN202",'Twitter',100,45)], 
                           ["user_id", "app" ,"time_in_secs","age"]).write.csv("demo",mode='append')

In [18]:
#app wise time spent

app_df=data.groupBy('app').agg(F.sum('time_in_secs').alias('total_time')).orderBy('total_time',ascending=False)

In [19]:
app_query=(app_df.writeStream.queryName('app_wise_query').outputMode('complete').format('memory').start())

In [27]:
spark.sql("select * from app_wise_query ").toPandas().head(5)

Unnamed: 0,app,total_time
0,FB,3410
1,Insta,560
2,Twitter,210


In [11]:
df_5=spark.createDataFrame([("XN203",'FB',500,30),("XN201",'Insta',30,19),("XN202",'Twitter',100,45)], 
                           ["user_id", "app" ,"time_in_secs","age"]).write.csv("csv_folder",mode='append')

In [26]:
spark.sql("select * from app_wise_query ").toPandas().head(5)

Unnamed: 0,app,total_time
0,FB,3410
1,Insta,560
2,Twitter,210


In [28]:
# app wise mean age 
age_df=data.groupBy('app').agg(F.avg('age').alias('mean_age')).orderBy('mean_age',ascending=False)


In [None]:
age_query=(age_df.writeStream.queryName('age_query').outputMode('complete').format('memory').start())

In [30]:
spark.sql("select * from age_query ").toPandas().head(5)

Unnamed: 0,app,mean_age
0,Twitter,38.5
1,FB,30.571429
2,Insta,25.5


In [15]:
df_6=spark.createDataFrame([("XN210",'FB',500,50),("XN255",'Insta',30,23),("XN222",'Twitter',100,30)], 
                           ["user_id", "app" ,"time_in_secs","age"]).write.csv("csv_folder",mode='append')

In [32]:
spark.sql("select * from age_query ").toPandas().head(5)

Unnamed: 0,app,mean_age
0,Twitter,38.5
1,FB,30.571429
2,Insta,25.5


In [27]:
# Join static dataframe with streaming dataframe
app_df=spark.createDataFrame([('FB','FACEBOOK'),('Insta','INSTAGRAM'),('Twitter','TWITTER')],["app", "full_name"])
app_df.show()

+-------+---------+
|    app|full_name|
+-------+---------+
|     FB| FACEBOOK|
|  Insta|INSTAGRAM|
|Twitter|  TWITTER|
+-------+---------+



In [28]:
app_stream_df=data.join(app_df,'app')

In [29]:
join_query=(app_stream_df.writeStream.queryName('join_query').outputMode('append').format('memory').start())

In [30]:
spark.sql("select * from join_query ").toPandas().head(50)

Unnamed: 0,app,user_id,time_in_secs,age,full_name
0,FB,XN201,10,19,FACEBOOK
1,FB,XN203,500,30,FACEBOOK
2,FB,XN203,500,30,FACEBOOK
3,FB,XN203,100,30,FACEBOOK
4,FB,XN203,300,30,FACEBOOK
5,FB,XN202,2000,45,FACEBOOK
6,Insta,XN201,30,19,INSTAGRAM
7,Insta,XN201,30,19,INSTAGRAM
8,Insta,XN202,500,45,INSTAGRAM
9,Twitter,XN201,10,19,TWITTER
