# Football Twitter Streaming

## Imports the needed modules

In [None]:
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import lit, explode, split, col, from_json, to_json, json_tuple, window, struct, udf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, LongType

## Set Up Spark Session and Define Schemas

The schemas defined here are needed in order to extract the content from the twitter posts in the kafka topic

In [None]:
spark = SparkSession.builder.appName("wordCounter").getOrCreate()

# Defines schema of Twitter Post
tweetSchema = StructType() \
    .add("payload", StringType())

payloadSchema = StructType() \
    .add("Text", StringType()) \
    .add("Lang", StringType())

## Utility Functions

ExtractTweetPayload is a function used to extract a dataframe with the content and timestamp of the twitter post from a kafka message in json format.

GetLastName is a UDF used to extract the last word in a string column of a dataframe.

In [None]:
# extracts structured content from json tweet message
def extractTweetPayload(df, tweetSchema, payloadSchema):
    return df \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)", "offset") \
        .withColumn("data", from_json("value", tweetSchema)) \
        .withColumn("payload", from_json("data.payload", payloadSchema)) \
        .select("payload.*", "key", "timestamp")


def getLastName(full_name):
    return full_name.split(" ")[-1:][0]

## Streaming Queries

Here I define the streaming queries, they perform simple word count over specific columns of the kafka messages. 
I decided to go with hopping windows in order to visualize in a more efficient ways moving trends in the Twitter topics.

In [None]:
def wordCountQuery(df, colName):
    return df \
        .withWatermark("timestamp", "10 seconds") \
        .withColumn('word', explode(split(col(colName), ' '))) \
        .groupBy(window(col("timestamp"), "10 seconds", "5 seconds"),
                 col('word')
                 ).count() \
        .select("word", "count", to_json(struct("word", "count")).alias("value"))

In [None]:
def langCountQuery(df, colName):
    return df \
        .withWatermark("timestamp", "2 minutes") \
        .groupBy(
            window(col("timestamp"), "2 minutes", "1 minutes"),
            col(colName)
        ).count() \
        .select(colName, "count", to_json(struct(colName, "count")).alias("value"))


## Static Dataset Import and Setup

In order to extract the meaningful words from the twitter posts,I decided to load the public FIFA 21 dataset, which contains data about the most popular football players and clubs.
In the following lines I load the dataset, then I extract and concatenate the list of teams and players into an unique dataframe.

In [None]:
players = spark.read \
    .option("header", "true") \
    .option("mode", "DROPMALFORMED") \
    .csv("players_21.csv")

lastNameUDF = udf(getLastName, StringType())

player_names = players \
    .withColumn(
        "word", lastNameUDF("short_name")) \
    .withColumn("category", lit("Player")) \
    .select("word", "category") \
    .limit(500) \

teams = players \
    .select("club_name") \
    .withColumn("category", lit("Team")) \
    .limit(500) \
    .dropDuplicates() \

topics = player_names.union(teams)

# Kafka Data Source Config

In [None]:
# Reads the data from kafka
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("failOnDataLoss", "false") \
    .option("subscribe", "tweets") \
    .option("startingOffsets", "earliest") \
    .load()

messages = extractTweetPayload(df, tweetSchema, payloadSchema)

# Streaming Queries Startup

Run the following cells to start the streaming queries and write the final output into the respective Kafka topics 

Note that the queries will keep on running until you stop them,since they're operating on a never ending stream .

In [None]:
# the topics are counted from the queries and joined with the players and clubs dataframe 
wordCount = wordCountQuery(messages, "Text") \
    .join(topics, "word") \
    .select("word", "count","category", to_json(struct("word", "count","category")).alias("value"))

# the final output is written to the CountByName Kafka topic
query = wordCount \
    .writeStream \
    .format("kafka") \
    .option("checkpointLocation", "./checkpoints") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "countByName") \
    .start()
'
query.awaitTermination()

In [None]:
langCount = langCountQuery(messages, "Lang")

query = wordCount \
    .writeStream \
    .format("kafka") \
    .option("checkpointLocation", "/home/alessandro/Desktop/Repos/Football-Twitter-Streaming/checkpoints") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "countByLang") \
    .start()
'
query.awaitTermination()