In [0]:
!pip install pyspark
!pip install graphframes

Collecting pyspark
  Downloading pyspark-3.5.4.tar.gz (317.3 MB)
[?25l[K     |                                | 10 kB 10.6 MB/s eta 0:00:30[K     |                                | 20 kB 4.2 MB/s eta 0:01:17[K     |                                | 30 kB 6.1 MB/s eta 0:00:53[K     |                                | 40 kB 3.7 MB/s eta 0:01:27[K     |                                | 51 kB 3.7 MB/s eta 0:01:27[K     |                                | 61 kB 4.4 MB/s eta 0:01:13[K     |                                | 71 kB 4.7 MB/s eta 0:01:09[K     |                                | 81 kB 5.3 MB/s eta 0:01:01[K     |                                | 92 kB 5.5 MB/s eta 0:00:58[K     |                                | 102 kB 4.5 MB/s eta 0:01:10[K     |                                | 112 kB 4.5 MB/s eta 0:01:10[K     |                                | 122 kB 4.5 MB/s eta 0:01:10[K     |                                | 133 kB 4.5 MB/s eta 0:01:10[K     |  

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, window, count

spark = SparkSession.builder \
    .appName("SingleThreadedStreaming") \
    .master("local[1]") \
    .config("spark.sql.shuffle.partitions", "1") \
    .getOrCreate()

In [0]:
from pyspark.sql.types import StructType, StringType, TimestampType, IntegerType

schema = StructType() \
    .add("id", IntegerType(), True) \
    .add("game_name", StringType(), True) \
    .add("sentiment", IntegerType(), True) \
    .add("timestamp", TimestampType(), True) \

tweets_df = spark.read.csv("/FileStore/tables/twitter.csv", schema=schema, header=True)
tweets_df.show(10)


+------+------------+---------+--------------------+
|    id|   game_name|sentiment|           timestamp|
+------+------------+---------+--------------------+
| 40239|      Google|        2|2024-12-29 19:05:...|
| 21709|       Dota2|        2|2024-12-29 19:05:...|
| 49100|WorldOfCraft|        2|2024-12-29 19:05:...|
| 23293|       CS-GO|        3|2024-12-29 19:05:...|
| 26841|  CallOfDuty|        2|2024-12-29 19:05:...|
| 32590|   HomeDepot|        2|2024-12-29 19:05:...|
| 36169|        FIFA|        1|2024-12-29 19:05:...|
|122210| Hearthstone|        2|2024-12-29 19:05:...|
| 86642|WorldOfCraft|        1|2024-12-29 19:05:...|
|114015|   MaddenNFL|        3|2024-12-29 19:05:...|
+------+------------+---------+--------------------+
only showing top 10 rows



## Streaming

In [0]:
tweets_dir = "/twitter"

tweets_df \
    .write \
    .mode('overwrite') \
    .csv(tweets_dir)

In [0]:
tweets_stream = spark.readStream.schema(schema) \
    .option("header", True) \
    .option('multiline', True) \
    .option('maxFilesPerTrigger', '1').csv(tweets_dir)

tweets_stream.isStreaming


Out[5]: True

## Complete processing

In [0]:
import pyspark.sql.functions as F

stream_df = tweets_stream.withColumn("processed_at", F.current_timestamp())


query = stream_df.writeStream.outputMode("append") \
    .format("memory") \
    .queryName('const10') \
    .start()

In [0]:
%sql select game_name, sentiment, count(*), processed_at from const10 group by game_name, sentiment, processed_at order by game_name

game_name,sentiment,count(1),processed_at
Amazon,1,406,2024-12-31T12:28:11.271+0000
Amazon,0,285,2024-12-31T12:28:11.271+0000
Amazon,3,493,2024-12-31T12:28:11.271+0000
Amazon,2,446,2024-12-31T12:28:11.271+0000
Amazon,1,996,2024-12-31T12:28:15.441+0000
Amazon,3,1270,2024-12-31T12:28:15.441+0000
Amazon,2,1151,2024-12-31T12:28:15.441+0000
Amazon,0,708,2024-12-31T12:28:15.441+0000
Amazon,3,1250,2024-12-31T12:28:19.324+0000
Amazon,2,1189,2024-12-31T12:28:19.324+0000


In [0]:
%sql select processed_at from const10 group by processed_at

processed_at
2024-12-31T12:28:11.271+0000
2024-12-31T12:28:15.441+0000
2024-12-31T12:28:19.324+0000
2024-12-31T12:28:22.438+0000
2024-12-31T12:28:25.590+0000
2024-12-31T12:28:28.793+0000
2024-12-31T12:28:31.684+0000
2024-12-31T12:28:34.587+0000


In [0]:
%sql select * from const10

id,game_name,sentiment,timestamp,processed_at
594765,PlayStation5(PS5),3,2024-12-29T19:24:44.463+0000,2024-12-31T12:28:11.271+0000
855247,Fortnite,3,2024-12-29T19:24:44.463+0000,2024-12-31T12:28:11.271+0000
252907,Nvidia,3,2024-12-29T19:24:44.465+0000,2024-12-31T12:28:11.271+0000
978264,CallOfDuty,1,2024-12-29T19:24:44.467+0000,2024-12-31T12:28:11.271+0000
71603,HomeDepot,1,2024-12-29T19:24:44.470+0000,2024-12-31T12:28:11.271+0000
75003,GrandTheftAuto(GTA),3,2024-12-29T19:24:44.471+0000,2024-12-31T12:28:11.271+0000
80916,Borderlands,3,2024-12-29T19:24:44.472+0000,2024-12-31T12:28:11.271+0000
633153,CallOfDutyBlackopsColdWar,3,2024-12-29T19:24:44.472+0000,2024-12-31T12:28:11.271+0000
768007,MaddenNFL,3,2024-12-29T19:24:44.474+0000,2024-12-31T12:28:11.271+0000
874778,TomClancysRainbowSix,3,2024-12-29T19:24:44.474+0000,2024-12-31T12:28:11.271+0000


## Windowed processing

In [0]:
import pyspark.sql.functions as F

stream_df = tweets_stream.withColumn("processed_at", F.current_timestamp())
stream_df = stream_df.groupBy( \
     'game_name', 'sentiment', F.window(F.col('timestamp'), '5 minute')) \
    .count()

query = stream_df.writeStream.outputMode("complete") \
    .format("memory") \
    .queryName('const11') \
    .start()

In [0]:
%sql select processed_at from const11 group by processed_at

processed_at
2024-12-31T12:30:57.590+0000
2024-12-31T12:31:00.633+0000
2024-12-31T12:31:03.921+0000
2024-12-31T12:31:07.048+0000
2024-12-31T12:31:12.633+0000
2024-12-31T12:31:09.808+0000
2024-12-31T12:30:54.007+0000
2024-12-31T12:31:15.444+0000


In [0]:
%sql select * from const11 order by game_name

game_name,sentiment,window,count
Amazon,3,"List(2024-12-29T19:25:00.000+0000, 2024-12-29T19:30:00.000+0000)",365
Amazon,0,"List(2024-12-29T19:05:00.000+0000, 2024-12-29T19:10:00.000+0000)",1152
Amazon,0,"List(2024-12-29T19:10:00.000+0000, 2024-12-29T19:15:00.000+0000)",1426
Amazon,2,"List(2024-12-29T19:20:00.000+0000, 2024-12-29T19:25:00.000+0000)",2264
Amazon,1,"List(2024-12-29T19:15:00.000+0000, 2024-12-29T19:20:00.000+0000)",1929
Amazon,1,"List(2024-12-29T19:10:00.000+0000, 2024-12-29T19:15:00.000+0000)",1908
Amazon,3,"List(2024-12-29T19:10:00.000+0000, 2024-12-29T19:15:00.000+0000)",2278
Amazon,3,"List(2024-12-29T19:20:00.000+0000, 2024-12-29T19:25:00.000+0000)",2390
Amazon,0,"List(2024-12-29T19:15:00.000+0000, 2024-12-29T19:20:00.000+0000)",1274
Amazon,2,"List(2024-12-29T19:25:00.000+0000, 2024-12-29T19:30:00.000+0000)",332


In [0]:
%sql select * from const11

processed_at,game_name,sentiment,window,count
2024-12-31T12:30:57.590+0000,Hearthstone,3,"List(2024-12-29T19:05:00.000+0000, 2024-12-29T19:10:00.000+0000)",1271
2024-12-31T12:31:00.633+0000,Google,0,"List(2024-12-29T19:10:00.000+0000, 2024-12-29T19:15:00.000+0000)",285
2024-12-31T12:31:00.633+0000,RedDeadRedemption(RDR),1,"List(2024-12-29T19:05:00.000+0000, 2024-12-29T19:10:00.000+0000)",577
2024-12-31T12:31:00.633+0000,ApexLegends,2,"List(2024-12-29T19:05:00.000+0000, 2024-12-29T19:10:00.000+0000)",737
2024-12-31T12:30:57.590+0000,Battlefield,2,"List(2024-12-29T19:05:00.000+0000, 2024-12-29T19:10:00.000+0000)",1146
2024-12-31T12:31:03.921+0000,GrandTheftAuto(GTA),0,"List(2024-12-29T19:10:00.000+0000, 2024-12-29T19:15:00.000+0000)",726
2024-12-31T12:31:00.633+0000,Nvidia,2,"List(2024-12-29T19:10:00.000+0000, 2024-12-29T19:15:00.000+0000)",520
2024-12-31T12:31:07.048+0000,PlayStation5(PS5),0,"List(2024-12-29T19:15:00.000+0000, 2024-12-29T19:20:00.000+0000)",430
2024-12-31T12:31:12.633+0000,ApexLegends,1,"List(2024-12-29T19:20:00.000+0000, 2024-12-29T19:25:00.000+0000)",804
2024-12-31T12:31:09.808+0000,PlayStation5(PS5),0,"List(2024-12-29T19:15:00.000+0000, 2024-12-29T19:20:00.000+0000)",761
