In [1]:
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
def create_kafka_stream(spark: SparkSession, kafka_servers: str, kafka_topic: str,
                       sasl_username: str = None, sasl_password: str = None):
    """Create a Spark Structured Streaming DataFrame from Kafka."""
    
    schema = StructType([
        StructField("did", StringType(), True),
        StructField("time_us", LongType(), True),
        StructField("kind", StringType(), True),
        StructField("commit", StructType([
            StructField("operation", StringType(), True),
            StructField("collection", StringType(), True),
            StructField("record", StructType([
                StructField("text", StringType(), True),
                StructField("langs", ArrayType(StringType()), True),
                StructField("createdAt", StringType(), True)
            ]), True)
        ]), True)
    ])
    
    # Base Kafka options
    kafka_options = {
        "kafka.bootstrap.servers": kafka_servers,
        "subscribe": kafka_topic,
        "startingOffsets": "latest"
    }
    
    # Add SASL authentication if credentials provided
    if sasl_username and sasl_password:
        kafka_options.update({
            "kafka.security.protocol": "SASL_PLAINTEXT",  # or "SASL_SSL"
            "kafka.sasl.mechanism": "PLAIN",
            "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";'
        })
    
    # Read from Kafka
    kafka_df = spark.readStream \
        .format("kafka")
    
    for key, value in kafka_options.items():
        kafka_df = kafka_df.option(key, value)
    
    kafka_df = kafka_df.load()
    
    # Parse JSON
    parsed_df = kafka_df.select(
        from_json(col("value").cast("string"), schema).alias("data")
    ).select("data.*")
    
    return parsed_df


def process_language_counts(df):
    """Process and aggregate language counts from posts."""
    
    # Filter for posts only
    posts_df = df.filter(
        (col("kind") == "commit") & 
        (col("commit.operation") == "create") &
        (col("commit.collection") == "app.bsky.feed.post")
    )
    
    # Extract and explode languages
    languages_df = posts_df \
        .select(
            col("commit.record.text").alias("text"),
            explode(coalesce(col("commit.record.langs"), array())).alias("language")
        ) \
        .filter(col("language").isNotNull())
    
    # Add timestamp for windowing
    languages_df = languages_df \
        .withColumn("timestamp", current_timestamp())
    
    # Aggregate by language with 2-minute tumbling window
    language_counts = languages_df \
        .withWatermark("timestamp", "2 minutes") \
        .groupBy(
            window(col("timestamp"), "2 minutes"),
            col("language")
        ) \
        .agg(
            count("*").alias("post_count")
        ) \
        .select(
            col("window.start").alias("window_start"),
            col("window.end").alias("window_end"),
            col("language"),
            col("post_count")
        ) \
        .orderBy(col("post_count").desc())
    
    return language_counts

In [None]:
KAFKA_SERVERS = "wilsoniumite.com:9092"
KAFKA_TOPIC = "bluesky-firehose"
SASL_USERNAME = "notebook1"
SASL_PASSWORD = ""  # ask wilson for this i ain't pushing the password to github

In [9]:
spark = SparkSession.builder.appName("BlueskyKafkaAnalysis").config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.1").getOrCreate()

spark.sparkContext.setLogLevel("WARN")
print("✓ Spark initialized")

✓ Spark initialized


In [10]:
stream_df = create_kafka_stream(
    spark, 
    KAFKA_SERVERS, 
    KAFKA_TOPIC, 
    SASL_USERNAME, 
    SASL_PASSWORD
)
print("✓ Kafka stream created")

✓ Kafka stream created


In [None]:
language_counts = process_language_counts(stream_df)

query = language_counts.writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("language_counts") \
    .trigger(processingTime="1 minute") \
    .start()

print("✓ Query started")

✓ Query started


25/10/15 15:46:24 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-d3a6de11-4b92-4aaf-84d8-c8405caf8c0f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/10/15 15:46:24 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

In [None]:
spark.sql("""
    SELECT language, post_count, window_start, window_end
    FROM language_counts 
    ORDER BY post_count DESC 
    LIMIT 20
""").show(truncate=False)

+--------+----------+------------+----------+
|language|post_count|window_start|window_end|
+--------+----------+------------+----------+
+--------+----------+------------+----------+



                                                                                