In [0]:
# Raja Jain
from pathlib import Path
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, LongType, TimestampType
import pyspark.sql.functions as f
udf = f.udf
from pyspark.ml.feature import Binarizer, StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics

sc = spark.sparkContext

In [0]:
# CSV options
file_type = "csv"
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# helper function
def create_spark_df(path:str, schema, target_columns:list=None):
    df = (spark.read.format(file_type)
          .schema(schema)
                .option("header", first_row_is_header)
                .option("sep", delimiter)
                .option("mode", "dropMalformed")
                .option("ignoreLeadingWhiteSpace", True)
                .option("ignoreTrailingWhiteSpace", True)
                .load(path))
    if target_columns:
        df = df.select(*target_columns)
    
    return df

root_dir = Path("/FileStore/tables/fifa_tweets")

In [0]:
fifaSchema = StructType(
    [
        StructField("ID", LongType(), True),
        StructField("lang", StringType(), True),
        StructField("Date", TimestampType(), True),
        StructField("Source", StringType(), True),
        StructField("len", LongType(), True),
        StructField("Orig_Tweet", StringType(), True),
        StructField("Tweet", StringType(), True),
        StructField("Likes", LongType(), True),
        StructField("RTs", LongType(), True),
        StructField("Hashtags", StringType(), True),
        StructField("UserMentionNames", StringType(), True),
        StructField("UserMentionID", StringType(), True),
        StructField("Name", StringType(), True),
        StructField("Place", StringType(), True),
        StructField("Followers", LongType(), True),
        StructField("Friends", LongType(), True),
    ]
)

PARTITIONS = 10
fifa_data = create_spark_df(str(root_dir/"FIFA.csv"), schema = fifaSchema).repartition(PARTITIONS)
fifa_data.printSchema()

root
 |-- ID: long (nullable = true)
 |-- lang: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Source: string (nullable = true)
 |-- len: long (nullable = true)
 |-- Orig_Tweet: string (nullable = true)
 |-- Tweet: string (nullable = true)
 |-- Likes: long (nullable = true)
 |-- RTs: long (nullable = true)
 |-- Hashtags: string (nullable = true)
 |-- UserMentionNames: string (nullable = true)
 |-- UserMentionID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Place: string (nullable = true)
 |-- Followers: long (nullable = true)
 |-- Friends: long (nullable = true)



In [0]:
def process_fifa_data(df):
    df = df.select(*["ID","Date","Hashtags"])
    df = df.filter(f.col("Hashtags").isNotNull())
    df = df.withColumn("Hashtags", f.explode(f.split('Hashtags', ',')))
    return (df.select(f.col("ID"),f.col("Date"),f.col("Hashtags"))
             .groupBy(f.window("Date","60 minutes"), f.col("Hashtags"))
             .agg(f.count("Hashtags").alias("trending"))
             .filter(f.col("trending")>100))

fifa_static_data = process_fifa_data(fifa_data)
(fifa_static_data.show())

+--------------------+-----------------+--------+
|              window|         Hashtags|trending|
+--------------------+-----------------+--------+
|{2018-07-15 17:00...|    WorldCupFinal|    2148|
|{2018-07-04 15:00...|       ThreeLions|     159|
|{2018-06-30 16:00...|           URUPOR|     157|
|{2018-07-01 02:00...|    FIFAStadiumDJ|     106|
|{2018-07-15 17:00...|          croatia|    1660|
|{2018-07-10 18:00...|         Worldcup|     127|
|{2018-07-10 18:00...|           BELFRA|     158|
|{2018-07-15 23:00...|         worldcup|     213|
|{2018-07-15 17:00...|     WorldCup2018|     174|
|{2018-07-15 17:00...|WorldCupFinal2018|     128|
|{2018-07-01 23:00...|           CRODEN|     115|
|{2018-07-15 17:00...|       Russia2018|     204|
|{2018-07-11 19:00...|       ThreeLions|     514|
|{2018-07-10 15:00...|        Wimbledon|     464|
|{2018-06-30 16:00...|    FIFAStadiumDJ|     213|
|{2018-07-11 21:00...|         football|     299|
|{2018-07-02 00:00...|    FIFAStadiumDJ|     231|


# Create Streaming Files

In [0]:
def create_stream_contents(df, stream_dir_path):
    print("Removing existing...")
    dbutils.fs.rm(stream_dir_path, True)
    print("Existing removed!")
    print(f"writing files at {stream_dir_path}")
    df.write.format("csv").option("header",True).save(stream_dir_path)

fifa_stream_path = str(root_dir/"fifa_stream")
create_stream_contents(fifa_data, fifa_stream_path)

# fifa_sorted = fifa_data.orderBy(f.col("Date")).repartition(PARTITIONS).persist()
# fifa_stream_sorted_path = str(root_dir/"fifa_stream_sorted")
# create_stream_contents(fifa_sorted, fifa_stream_sorted_path)

Removing existing...
Existing removed!
writing files at /FileStore/tables/fifa_tweets/fifa_stream


# Source

In [0]:
def create_source_stream(path:str, schema, target_columns:list=None):
    source_stream = (spark.readStream.format(file_type)
          .schema(schema)
                .option("header", first_row_is_header)
                .option("sep", delimiter)
                .option("mode", "dropMalformed")
                .option("ignoreLeadingWhiteSpace", True)
                .option("ignoreTrailingWhiteSpace", True)
                .option("maxFilesPerTrigger", 1)
                .load(path))
    
    return source_stream

source_stream = create_source_stream(path=fifa_stream_path, schema=fifaSchema)

# Query

In [0]:
trending_hashtags = process_fifa_data(source_stream)

# Sink

In [0]:
def create_sink(df, query_name):
    return (df
            .writeStream.outputMode("complete")
            .format("memory")
            .queryName(query_name)
            .trigger(processingTime="10 seconds")
            .start())

fifa_sink = create_sink(trending_hashtags, "trendingHashtags")

In [0]:
current = spark.sql(
"""
SELECT * FROM trendingHashtags
""")
current.orderBy("window").show(current.count(), False)

+------------------------------------------+-----------------+--------+
|window                                    |Hashtags         |trending|
+------------------------------------------+-----------------+--------+
|{2018-06-30 00:00:00, 2018-06-30 01:00:00}|WorldCup         |1131    |
|{2018-06-30 00:00:00, 2018-06-30 01:00:00}|FIFAStadiumDJ    |535     |
|{2018-06-30 01:00:00, 2018-06-30 02:00:00}|EXO              |109     |
|{2018-06-30 01:00:00, 2018-06-30 02:00:00}|WorldCup         |935     |
|{2018-06-30 01:00:00, 2018-06-30 02:00:00}|FIFAStadiumDJ    |445     |
|{2018-06-30 16:00:00, 2018-06-30 17:00:00}|ARG              |1299    |
|{2018-06-30 16:00:00, 2018-06-30 17:00:00}|ARGFRA           |102     |
|{2018-06-30 16:00:00, 2018-06-30 17:00:00}|Mbappe           |167     |
|{2018-06-30 16:00:00, 2018-06-30 17:00:00}|FRA              |1811    |
|{2018-06-30 16:00:00, 2018-06-30 17:00:00}|WorldCup         |6594    |
|{2018-06-30 16:00:00, 2018-06-30 17:00:00}|POR              |19