In [0]:
# Sean Wendlandt
# lab 8 5/18/23

from pyspark.sql.types import StructType, StructField, LongType, StringType, TimestampType
import pyspark.sql.functions as f

# Streaming data setup 
fifaSchema = StructType( \
                        [StructField('ID', LongType(), True), \
                         StructField('lang', StringType(), True), \
                         StructField('Date', TimestampType(), True), \
                         StructField('Source', StringType(), True), \
                         StructField('len', LongType(), True), \
                         StructField('Orig_Tweet', StringType(), True), \
                         StructField('Tweet', StringType(), True), \
                         StructField('Likes', LongType(), True), \
                         StructField('RTs', LongType(), True), \
                         StructField('Hashtags', StringType(), True), \
                         StructField('UserMentionNames', StringType(), True), \
                         StructField('UserMentionID', StringType(), True), \
                         StructField('Name', StringType(), True), \
                         StructField('Place', StringType(), True), \
                         StructField('Followers', LongType(), True), \
                         StructField('Friends', LongType(), True), \
                        ])

In [0]:
# Load in dataset /FileStore/tables/FIFA__1_.csv
fifa = spark.read.format("csv").option("header", True).schema(fifaSchema).option("ignoreLeadingWhiteSpace", True).option("mode","dropMalformed").load("dbfs:///FileStore/tables/FIFA__1_.csv")

In [0]:
fifa.show()

#put it into memory as a table so we can do sql on it later
fifa.createOrReplaceTempView("fifa")


+-------------------+----+-------------------+-------------------+---+--------------------+--------------------+-----+----+--------------------+--------------------+--------------------+--------------------+--------------------+---------+-------+
|                 ID|lang|               Date|             Source|len|          Orig_Tweet|               Tweet|Likes| RTs|            Hashtags|    UserMentionNames|       UserMentionID|                Name|               Place|Followers|Friends|
+-------------------+----+-------------------+-------------------+---+--------------------+--------------------+-----+----+--------------------+--------------------+--------------------+--------------------+--------------------+---------+-------+
|1013597056219295744|  en|2018-07-02 01:35:44|Twitter for Android|139|RT @FCBarcelona: ...|scores the winnin...|    0|1031|            WorldCup|FC Barcelona,Ivan...|FCBarcelona,ivanr...|        Febri Aditya|               Bogor|      667|    686|
|10135970474

In [0]:
# Check number of partitions
print(fifa.rdd.getNumPartitions())

8


In [0]:
# Repartition
fifa = fifa.orderBy('Date').repartition(20).persist()
print(fifa.rdd.getNumPartitions())
# print(fifa.rdd.glom().collect())

20


In [0]:
# Save newly partitioned data to FileStore
dbutils.fs.rm("FileStore/tables/fifaTime/", True)
fifa.write.format("csv").option("header", True).save("FileStore/tables/fifaTime/")

In [0]:
# Static Windows sanity check
fifa = fifa.select(f.col('ID'), f.col('Date'), f.col('Hashtags')).filter(f.col('Hashtags').isNotNull()).persist()
fifa = fifa.withColumn('Hashtags', f.explode(f.split('Hashtags', ','))).persist()
fifaWin = fifa.groupBy(f.window("Date", "60 minutes", "30 minutes"), "HashTags").agg(f.count("ID").alias("count")).filter(f.col("count") > 100).orderBy(f.col("window").asc(), f.col("count").desc())


In [0]:
# Check output 
fifaWin.show(20, False)

+------------------------------------------+-------------+-----+
|window                                    |HashTags     |count|
+------------------------------------------+-------------+-----+
|{2018-06-29 23:00:00, 2018-06-30 00:00:00}|WorldCup     |152  |
|{2018-06-29 23:30:00, 2018-06-30 00:30:00}|WorldCup     |1270 |
|{2018-06-29 23:30:00, 2018-06-30 00:30:00}|FIFAStadiumDJ|615  |
|{2018-06-30 00:00:00, 2018-06-30 01:00:00}|WorldCup     |2278 |
|{2018-06-30 00:00:00, 2018-06-30 01:00:00}|FIFAStadiumDJ|1073 |
|{2018-06-30 00:00:00, 2018-06-30 01:00:00}|EXO          |157  |
|{2018-06-30 00:00:00, 2018-06-30 01:00:00}|worldcup     |144  |
|{2018-06-30 00:30:00, 2018-06-30 01:30:00}|WorldCup     |2158 |
|{2018-06-30 00:30:00, 2018-06-30 01:30:00}|FIFAStadiumDJ|990  |
|{2018-06-30 00:30:00, 2018-06-30 01:30:00}|EXO          |156  |
|{2018-06-30 00:30:00, 2018-06-30 01:30:00}|worldcup     |150  |
|{2018-06-30 01:00:00, 2018-06-30 02:00:00}|WorldCup     |1884 |
|{2018-06-30 01:00:00, 20

In [0]:
# Structured Streaming 
sourceStream = spark.readStream.format("csv").option("header", True).schema(fifaSchema).option("maxFilesPerTrigger", 1).load("dbfs:///FileStore/tables/fifaTime/")

In [0]:
# Query for streaming information
fifaWin = sourceStream.withWatermark("Date", "24 hours")\
                      .select(f.col('ID'), f.col('Date'), f.col('Hashtags'))\
                      .filter(f.col('Hashtags').isNotNull())\
                      .withColumn('Hashtags', f.explode(f.split('Hashtags', ',')))\
                      .groupBy(f.window("Date", "60 minutes", "30 minutes"), "HashTags")\
                      .agg(f.count("ID").alias("count"))\
                      .filter(f.col("count") > 100)\
                      .orderBy(f.col("window").asc(), f.col("count").desc())


In [0]:
# Set up the sink
sinkStream = fifaWin.writeStream.outputMode("complete").format("memory").queryName("fifWin").trigger(processingTime='10 seconds'). start()

In [0]:
# Check the output while stream happening
spark.sql("SELECT * FROM fifWin").show(20, False)

+------------------------------------------+-------------+-----+
|window                                    |HashTags     |count|
+------------------------------------------+-------------+-----+
|{2018-06-29 23:00:00, 2018-06-30 00:00:00}|WorldCup     |152  |
|{2018-06-29 23:30:00, 2018-06-30 00:30:00}|WorldCup     |1270 |
|{2018-06-29 23:30:00, 2018-06-30 00:30:00}|FIFAStadiumDJ|615  |
|{2018-06-30 00:00:00, 2018-06-30 01:00:00}|WorldCup     |2278 |
|{2018-06-30 00:00:00, 2018-06-30 01:00:00}|FIFAStadiumDJ|1073 |
|{2018-06-30 00:00:00, 2018-06-30 01:00:00}|EXO          |157  |
|{2018-06-30 00:00:00, 2018-06-30 01:00:00}|worldcup     |144  |
|{2018-06-30 00:30:00, 2018-06-30 01:30:00}|WorldCup     |2158 |
|{2018-06-30 00:30:00, 2018-06-30 01:30:00}|FIFAStadiumDJ|990  |
|{2018-06-30 00:30:00, 2018-06-30 01:30:00}|EXO          |156  |
|{2018-06-30 00:30:00, 2018-06-30 01:30:00}|worldcup     |150  |
|{2018-06-30 01:00:00, 2018-06-30 02:00:00}|WorldCup     |1884 |
|{2018-06-30 01:00:00, 20