In [1]:
import os
import time
import json

import praw
from kafka import KafkaProducer, KafkaConsumer
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, from_json, schema_of_json, concat_ws, udf
)
from pyspark.sql.types import StringType
from textblob import TextBlob
from dotenv import load_dotenv

load_dotenv()  # Loads variables from .env if present

# Environment variables
REDDIT_CLIENT_ID = os.getenv('REDDIT_CLIENT_ID')
REDDIT_CLIENT_SECRET = os.getenv('REDDIT_CLIENT_SECRET')
REDDIT_USER_AGENT = os.getenv('REDDIT_USER_AGENT')
KAFKA_BROKER = os.getenv('KAFKA_BROKER')
TOPIC_NAME = os.getenv('TOPIC_NAME')


In [2]:
def create_reddit_instance():
    reddit = praw.Reddit(
        client_id=REDDIT_CLIENT_ID,
        client_secret=REDDIT_CLIENT_SECRET,
        user_agent=REDDIT_USER_AGENT
    )
    return reddit

def create_kafka_producer(broker):
    producer = KafkaProducer(
        bootstrap_servers=[broker],
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    return producer

reddit = create_reddit_instance()
producer = create_kafka_producer(KAFKA_BROKER)

subreddits = ["technology", "programming"]
all_posts = []

for subreddit in subreddits:
    print(f"Fetching r/{subreddit}...")
    for submission in reddit.subreddit(subreddit).new(limit=3):
        data = {
            "id": submission.id,
            "subreddit": subreddit,
            "title": submission.title,
            "selftext": submission.selftext,
            "created_utc": submission.created_utc
        }
        all_posts.append(data)
        print(f"Sending: {submission.title[:50]}... to Kafka")

        producer.send(TOPIC_NAME, value=data)

producer.flush()
producer.close()

print(f"Sent {len(all_posts)} posts total to topic='{TOPIC_NAME}'")


Fetching r/technology...
Sending: Dumping open source for proprietary rarely pays of... to Kafka
Sending: Researchers link DeepSeek's chatbot to Chinese Mob... to Kafka
Sending: iOS App Store apps with screenshot-reading malware... to Kafka
Fetching r/programming...
Sending: Automating web tasks on authenticated sites sucks,... to Kafka
Sending: the sudoku affair... to Kafka
Sending: Using DeepSeek To Make A Game... to Kafka
Sent 6 posts total to topic='reddit-posts'


In [3]:
import pandas as pd
df_ingest = pd.DataFrame(all_posts)
df_ingest


Unnamed: 0,id,subreddit,title,selftext,created_utc
0,1iilhpd,technology,Dumping open source for proprietary rarely pay...,,1738791000.0
1,1iilhkv,technology,Researchers link DeepSeek's chatbot to Chinese...,,1738791000.0
2,1iilgyn,technology,iOS App Store apps with screenshot-reading mal...,,1738791000.0
3,1iikd90,programming,Automating web tasks on authenticated sites su...,,1738788000.0
4,1iihdm9,programming,the sudoku affair,,1738781000.0
5,1iih5lg,programming,Using DeepSeek To Make A Game,,1738781000.0


In [4]:
# Create the Spark session
from pyspark.sql import SparkSession

spark = (SparkSession.builder
        .appName("KafkaTest")
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1")
        .getOrCreate())

# Sample JSON for schema inference
sample_json = """{"id":"abc","subreddit":"test","title":"hello","selftext":"world","created_utc":1660000000}"""
json_schema = spark.read.json(
    spark.sparkContext.parallelize([sample_json])
).schema

def analyze_sentiment(text):
    if not text:
        return 0.0
    return TextBlob(text).sentiment.polarity

sentiment_udf = udf(analyze_sentiment, StringType())

df_kafka_stream = (
    spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", KAFKA_BROKER)
         .option("subscribe", TOPIC_NAME)
         .option("startingOffsets", "latest")
         .load()
)

parsed_df = df_kafka_stream.select(
    from_json(col("value").cast("string"), json_schema).alias("data")
).select("data.*")

# Combine title + selftext, compute sentiment
combined_text = concat_ws(" ", col("title"), col("selftext"))
result_df = parsed_df.withColumn("sentiment", sentiment_udf(combined_text))

# Write to console in micro-batches
query = (
    result_df
    .writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", False)
    .start()
)

# Let it run for ~30 seconds so we can see some output
# Then stop the stream
time.sleep(30)
query.stop()

print("Stream stopped.")


:: loading settings :: url = jar:file:/Users/tobi/Documents/projects/reddit-stream/redenv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/tobi/.ivy2/cache
The jars for the packages stored in: /Users/tobi/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0ac253a9-08f8-403d-9958-cc0ac78dda44;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.1 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
downloading https://repo1.maven.org/maven2/org/apache/spark/spar

Stream stopped.
