In [1]:
#!pip install findspark
# do this only once at the beginning

import os
import findspark
import pyspark

SCALA_VERSION = '2.12'
SPARK_VERSION = '3.3.1'

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_{SCALA_VERSION}:{SPARK_VERSION} pyspark-shell'

findspark.add_packages("org.apache.spark:spark-sql-kafka-0-10_{SCALA_VERSION}:{SPARK_VERSION}")
findspark.init()

# set these variables for executing code on Spark; FOR THIS EXAMPLE NOT POSSIBLE!
os.environ['SPARK_HOME'] = '/usr/local/spark'
os.environ['PYSPARK_PYTHON'] = 'python3.10'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python3.10'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'notebook'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /usr/local/spark/spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar pyspark-shell'

In [2]:
import pandas as pd
import json
from kafka import KafkaConsumer
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType


In [3]:
# Spark session & context
spark = (SparkSession
         .builder
         .master("local[*]")
         .appName('tweet-data')
         # Add kafka package
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0")
         .getOrCreate())

sc = spark.sparkContext

sc

In [4]:
# Create stream dataframe setting kafka server, topic and offset option
df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka:9092") # kafka server
  .option("subscribe", "tweets") # topic
  .option("startingOffsets", "earliest") # start from beginning 
  .load())

In [5]:
from pyspark.sql.types import StringType

# Convert binary to string key and value
df1 = (df
    .withColumn("key", df["key"].cast(StringType()))
    .withColumn("value", df["value"].cast(StringType())))

In [6]:
from pyspark.sql.functions import from_json,col, from_unixtime, to_date, to_timestamp
from pyspark.sql.types import StructType, StructField, BooleanType, LongType, IntegerType

# Event data schema
schema_tweets = StructType(
     [StructField("tweet_count",IntegerType(),True),
     StructField("time",LongType(),True)])

# Create dataframe setting schema for event data
df_tweets = (df1
           # Sets schema for event data
           .withColumn("value", from_json("value", schema_tweets))
          )

df_tweets_formatted = (df_tweets.select(
    to_timestamp(from_unixtime(col("value.time")/1000)).alias("timestamp"),
    col("value.tweet_count").alias("tweets")))

In [7]:
# Start query stream over stream dataframe
raw_path = "parquet_file"
checkpoint_path = "parquet_checkpoint"

queryStream =(
    df_tweets_formatted
    .writeStream
    .format("parquet")
    .queryName("tweet_ingestion")
    .option("checkpointLocation", checkpoint_path)
    .option("path", raw_path)
    .outputMode("append")
    .partitionBy("timestamp")
    .start())

In [8]:
# Read parquet files as stream to output the number of rows
df_tweets_change = (
    spark
    .readStream
    .format("parquet")
    .schema(df_tweets_formatted.schema)
    .load(raw_path)
)

In [9]:
# Output to memory to count rows
queryStreamMem = (df_tweets_change
 .writeStream
 .format("memory")
 .queryName("df_tweets_change")
 .outputMode("update")
 .start())

In [37]:
# Create dataframe grouping by window 
from pyspark.sql.functions import window, col, current_timestamp

df_count = (
    df_tweets_change
    .withWatermark("timestamp", "1 day")
    .groupBy(window(col("timestamp"), "1 day"))
    .sum('tweets'))

df_count2 = (df_count.select(
    col("window.start").alias("start"),
    col("window.end").alias("end"),
    col("sum(tweets)").alias("tweets")
))

In [38]:
display(df_count2)

DataFrame[start: timestamp, end: timestamp, tweets: bigint]

In [40]:
# Create query stream with memory sink
queryStream = (df_count2
 .writeStream
 .format("memory")
 .queryName("tweets_change6")
 .outputMode("update")
 .start())

In [46]:
        df_pandas = spark.sql(
                """
                    select
                        start
                        ,end
                        ,sum(tweets)
                    from
                        tweets_change6
                    group by
                        start
                        ,end
                    order by
                        3 desc
                """
        ).toPandas()

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


In [47]:
df_pandas

Unnamed: 0,start,end,sum(tweets)
0,2023-01-15,2023-01-16,1149
1,2023-01-17,2023-01-18,987
2,2023-01-18,2023-01-19,975
3,2023-01-19,2023-01-20,912
4,2023-01-20,2023-01-21,875
5,2023-01-16,2023-01-17,815
6,2023-01-21,2023-01-22,335
7,2023-01-14,2023-01-15,298
