In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import MapType, StringType, IntegerType, ArrayType, StructType, \
                              StructField, LongType, DoubleType, BooleanType, FloatType
from IPython.display import display, clear_output
import time

# Step 1: Creating spark session

In [2]:
# Creating Spark session
spark = SparkSession.builder \
        .appName('kafka') \
        .getOrCreate()

In [3]:
## To always show the results of DataFrames and improve the formatting of the output
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

## To allow future conversion of Spark DataFrame into Pandas DataFrame
spark.conf.set("spark.sql.execution.arrow.enabled", True)

# Step 2: Building Spark Streaming DF

In [4]:
# Read data from the topic 
stream_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "broker:29092") \
  .option("subscribe", "loctweets") \
  .option("startingOffsets","earliest") \
  .load()

In [5]:
# Convert columns key and value to string
string_stream_df = stream_df \
    .withColumn("key", stream_df["key"].cast(StringType())) \
    .withColumn("value", stream_df["value"].cast(StringType()))

In [6]:
# Check schema to verify change of type
string_stream_df.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [7]:
# Schema of the location tweets data - keeping only fiels we  may need. 
schema_tweet = StructType([
    StructField("ID", LongType(),  True),
    StructField("USER_ID", LongType(),  True),
    StructField("PLACE_COUNTRY", StringType(), True)
    ]
)

In [8]:
# Convert values of streamed data to JSON and save to new df
json_stream_df = string_stream_df \
    .withColumn("value", F.from_json("value", schema_tweet))

In [9]:
# Print Schema of json df
json_stream_df.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: struct (nullable = true)
 |    |-- ID: long (nullable = true)
 |    |-- USER_ID: long (nullable = true)
 |    |-- PLACE_COUNTRY: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [10]:
# Flatten the data (id and user location) and rename the columns key, topic, timestamp to event_key, event_topic and event_timestamp
tweets_stream_df = json_stream_df \
    .select( \
        F.col("key").alias("event_key"), \
        F.col("topic").alias("event_topic"), \
        F.col("timestamp").alias("event_timestamp"), \
        "value.ID", \
        "value.PLACE_COUNTRY"
    )

In [11]:
tweets_stream_df.printSchema()

root
 |-- event_key: string (nullable = true)
 |-- event_topic: string (nullable = true)
 |-- event_timestamp: timestamp (nullable = true)
 |-- ID: long (nullable = true)
 |-- PLACE_COUNTRY: string (nullable = true)



# Step 3: Creating the window stream with watermark

In [12]:
# Create window stream to get the count of tweet per location over the last 120 seconds (window duration) every 60 seconds (slide duration)
window_duration = '120 seconds'
slide_duration = '60 seconds'

In [13]:
windowed_count_df = tweets_stream_df \
    .withWatermark("event_timestamp", "2 minutes") \
    .groupBy(F.window(tweets_stream_df.event_timestamp, window_duration, slide_duration), tweets_stream_df.PLACE_COUNTRY) \
    .count()

In [14]:
count_stream = windowed_count_df \
    .writeStream \
    .format("memory") \
    .outputMode("Complete") \
    .queryName("count_view") \
    .start()

In [15]:
# Showing tweets per country in a window
while True:
    clear_output(wait=True)
    display(spark.sql('SELECT * FROM count_view order by window.start DESC, count DESC, PLACE_COUNTRY').show(truncate=False))
    time.sleep(60)

+------------------------------------------+------------------+-----+
|window                                    |PLACE_COUNTRY     |count|
+------------------------------------------+------------------+-----+
|{2021-06-12 00:35:00, 2021-06-12 00:37:00}|Brasil            |1    |
|{2021-06-12 00:34:00, 2021-06-12 00:36:00}|United States     |4    |
|{2021-06-12 00:34:00, 2021-06-12 00:36:00}|Brasil            |3    |
|{2021-06-12 00:34:00, 2021-06-12 00:36:00}|Chile             |1    |
|{2021-06-12 00:34:00, 2021-06-12 00:36:00}|Colombia          |1    |
|{2021-06-12 00:34:00, 2021-06-12 00:36:00}|Dominican Republic|1    |
|{2021-06-12 00:34:00, 2021-06-12 00:36:00}|Ecuador           |1    |
|{2021-06-12 00:34:00, 2021-06-12 00:36:00}|Malaysia          |1    |
|{2021-06-12 00:34:00, 2021-06-12 00:36:00}|Spain             |1    |
|{2021-06-12 00:34:00, 2021-06-12 00:36:00}|United Kingdom    |1    |
|{2021-06-12 00:34:00, 2021-06-12 00:36:00}|Zimbabwe          |1    |
|{2021-06-12 00:33:0

None

KeyboardInterrupt: 

In [16]:
count_stream.stop()

In [17]:
spark.stop()