In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType
import pyspark.sql.functions as F


In [2]:
topic = "Stock_data"
server = 'localhost:9092'
scala_version = '2.12'
spark_version = '3.5.0'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.6.1'
]

In [3]:
def create_spark_session():
    try:
        spark_session = (SparkSession.builder.appName("Reddit_comments_analysis")
                         .master("local").config("spark.jars.packages", ",".join(packages))
                         .getOrCreate())
        spark_session.sparkContext.setLogLevel("ERROR")
        print('Spark session created successfully')
        return spark_session
    except Exception:
        print("Couldn't create the spark session")
    


In [4]:
def create_initial_dataframe(spark):
    try:
        init_df = (spark.readStream.format("kafka")
                   .option("kafka.bootstrap.servers", server)
                   .option("subscribe", topic)
                   .option("startingOffsets", "latest")
                   .load())
        print("Initial dataframe created successfully")
        return init_df
    except Exception as e:
        print(f"Initial dataframe couldn't be created due to exception: {e}")
    

In [9]:
def create_final_dataframe(df):
    schema = StructType([StructField('date', StringType(), True),
                         StructField('open', StringType(), True),
                         StructField('high', StringType(), True),
                         StructField('low', StringType(), True),
                         StructField('close', StringType(), True),
                         StructField('volume', StringType(), True),
                         ])
    df = df.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
    df = df.withColumn("value", F.from_json("value", schema)).select("value.*", "timestamp")
    
    return df

In [6]:
def start_streaming(df):
    print("Streaming is being started...")
    query = df \
        .writeStream \
        .trigger(processingTime='3 seconds') \
        .outputMode("append") \
        .format("memory") \
        .queryName("reddit_comments") \
        .start()
    return query.awaitTermination(1)

In [7]:
from IPython.core.display_functions import clear_output
from time import sleep


def write_streaming_data():
    spark = create_spark_session()
    df = create_initial_dataframe(spark)
    df_final = create_final_dataframe(df)
    start_streaming(df_final)

    while True:
        spark_df = spark.sql('SELECT * FROM reddit_comments')
        result = spark_df.toPandas()
        # subreddit_predictor(result)
        display(result.tail(10))
        sleep(3)
        clear_output(wait=True)

In [None]:
if __name__ == '__main__':
    write_streaming_data()

Unnamed: 0,date,open,high,low,close,volume,timestamp
0,2024-01-19 09:46:00-05:00,145.39999389648438,145.4499969482422,145.3300018310547,145.4499969482422,69222.0,2024-01-21 21:44:15.330
1,2024-01-19 09:47:00-05:00,145.44000244140625,145.40499877929688,145.27000427246094,145.40499877929688,74786.0,2024-01-21 21:44:18.335
2,2024-01-19 09:48:00-05:00,145.40499877929688,145.39999389648438,145.35499572753906,145.39999389648438,75332.0,2024-01-21 21:44:21.339
3,2024-01-19 09:49:00-05:00,145.38999938964844,145.05999755859375,145.0449981689453,145.05999755859375,72738.0,2024-01-21 21:44:24.344
4,2024-01-19 09:50:00-05:00,145.0399932861328,145.08999633789062,144.97000122070312,145.08999633789062,69344.0,2024-01-21 21:44:27.347
