In [1]:
%pip install -q pyspark==3.5.0 pandas pyarrow sqlalchemy psycopg2-binary

import pyspark, sys
print("PySpark:", pyspark.__version__)
print(sys.executable)

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("MastodonStreamDemo")
    .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,org.postgresql:postgresql:42.7.3")
    .getOrCreate()
)
spark


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.
PySpark: 3.5.0
/Users/leosohrabi/venv-jupyter/bin/python
:: loading settings :: url = jar:file:/Users/leosohrabi/venv-jupyter/lib/python3.13/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/leosohrabi/.ivy2/cache
The jars for the packages stored in: /Users/leosohrabi/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a0c47ddc-93ba-4a09-b21e-fe07509d0728;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
	fou

In [2]:
import psycopg2, textwrap

DDL = textwrap.dedent("""
CREATE TABLE IF NOT EXISTS mastodon_posts(
  id SERIAL PRIMARY KEY,
  username TEXT,
  content  TEXT,
  ts TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS streamed_toot_counts(
  batch_id INT,
  window_start TIMESTAMPTZ,
  window_end   TIMESTAMPTZ,
  cnt BIGINT
);
CREATE TABLE IF NOT EXISTS avg_toot_length_by_user(
  batch_id INT,
  username TEXT,
  avg_length DOUBLE PRECISION
);
""")

with psycopg2.connect("dbname=mastodon user=mastodon password=mastodon host=localhost port=5433") as c:
    with c.cursor() as cur:
        cur.execute(DDL)
        c.commit()
print("Tables OK")

Tables OK


25/10/07 13:59:00 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [3]:
from pyspark.sql.functions import col, from_json, when, window, length, lit, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType

KAFKA_BOOTSTRAP = "localhost:29092"
TOPIC = "mastodon_stream"

JDBC_URL = "jdbc:postgresql://localhost:5433/mastodon"
JDBC_PROPS = {"user":"mastodon","password":"mastodon","driver":"org.postgresql.Driver"}

schema = StructType([
    StructField("username", StringType()),
    StructField("text", StringType()),
    StructField("content", StringType())
])

kdf = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP)
    .option("subscribe", TOPIC)
    .option("startingOffsets", "latest")
    .load()
)

base = (
    kdf.selectExpr("CAST(value AS STRING) AS json")
       .select(from_json(col("json"), schema).alias("j"))
       .select(
           col("j.username").alias("username"),
           when(col("j.text").isNotNull(), col("j.text")).otherwise(col("j.content")).alias("content")
       )
       .na.drop("any", subset=["username","content"])
)

def sink(batch_df, batch_id:int):
    (batch_df
        .selectExpr("username","content","current_timestamp() as ts")
        .write.mode("append").jdbc(JDBC_URL,"mastodon_posts",properties=JDBC_PROPS)
    )
    toots_per_minute = (
        batch_df.withColumn("ts", current_timestamp())
                .groupBy(window(col("ts"),"1 minute"))
                .count()
                .select(
                    lit(batch_id).alias("batch_id"),
                    col("window.start").alias("window_start"),
                    col("window.end").alias("window_end"),
                    col("count").alias("cnt")
                )
    )
    toots_per_minute.write.mode("append").jdbc(JDBC_URL,"streamed_toot_counts",properties=JDBC_PROPS)

    avg_per_user = (
        batch_df.withColumn("length", length(col("content")))
                .groupBy("username")
                .avg("length")
                .select(
                    lit(batch_id).alias("batch_id"),
                    col("username"),
                    col("avg(length)").alias("avg_length")
                )
    )
    avg_per_user.write.mode("append").jdbc(JDBC_URL,"avg_toot_length_by_user",properties=JDBC_PROPS)

checkpoint_dir = "/tmp/chkpt_masto_nb_v1"
query = (
    base.writeStream
        .foreachBatch(sink)
        .option("checkpointLocation", checkpoint_dir)
        .start()
)
query.status

25/10/07 13:59:06 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


{'message': 'Initializing sources',
 'isDataAvailable': False,
 'isTriggerActive': False}

25/10/07 13:59:06 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/10/07 14:00:06 WARN KafkaOffsetReaderAdmin: Error in attempt 1 getting Kafka offsets: 
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions(ConsumerStrategy.scala:66)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions$(ConsumerStrategy.scala:65)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.retrieveAllPartitions(ConsumerStrategy.scala:10

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, when, current_timestamp, length, lit, window
from pyspark.sql.types import StructType, StructField, StringType

# Connexions
KAFKA_BOOTSTRAP = "localhost:29092"           # on parle à Kafka exposé sur 29092
JDBC_URL = "jdbc:postgresql://localhost:5433/mastodon"
JDBC_PROPS = {"user":"mastodon","password":"mastodon","driver":"org.postgresql.Driver"}

# Schéma JSON attendu
schema = StructType([
    StructField("username", StringType()),
    StructField("text", StringType()),
    StructField("content", StringType()),      # au cas où on envoie "content" au lieu de "text"
])

# Source Kafka (repars au plus propre)
kdf = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP)
    .option("subscribe", "mastodon_stream")
    .option("startingOffsets", "latest")
    .option("failOnDataLoss", "false")
    .load()
)

# Sink écrit *par micro-batch* (on traite batch_df, pas un DF global)
def sink(batch_df, batch_id):
    parsed = (batch_df.selectExpr("CAST(value AS STRING) AS json")
        .select(from_json(col("json"), schema).alias("j"))
        .select(
            col("j.username").alias("username"),
            when(col("j.text").isNotNull(), col("j.text")).otherwise(col("j.content")).alias("content")
        )
        .na.drop(subset=["username","content"])
    )

    # 1) Table brute (affichage)
    posts = parsed.select("username", "content", current_timestamp().alias("ts"))
    posts.write.mode("append").jdbc(JDBC_URL, "mastodon_posts", properties=JDBC_PROPS)

    # 2) Agrégats d'exemple (longueur moyenne par user)
    avg_by_user = (parsed.withColumn("length", length(col("content")))
        .groupBy("username").avg("length")
        .select(lit(batch_id).alias("batch_id"),
                col("username"),
                col("avg(length)").alias("avg_length")))
    avg_by_user.write.mode

In [7]:
query.isActive, query.status

(True,
 {'message': 'Getting offsets from KafkaV2[Subscribe[mastodon_stream]]',
  'isDataAvailable': False,
  'isTriggerActive': True})

25/10/07 14:04:57 WARN KafkaOffsetReaderAdmin: Error in attempt 2 getting Kafka offsets: 
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions(ConsumerStrategy.scala:66)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions$(ConsumerStrategy.scala:65)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.retrieveAllPartitions(ConsumerStrategy.scala:102)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.assignedTopicPartitions(ConsumerStrategy.scala:113)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.$anonfun$partitionsAssignedToAdmi

In [8]:
# 3) Voir ce qui est en base
import pandas as pd, sqlalchemy as sa
engine = sa.create_engine("postgresql+psycopg2://mastodon:mastodon@localhost:5433/mastodon")

try:
    display(pd.read_sql("SELECT * FROM mastodon_posts ORDER BY id DESC LIMIT 10", engine))
except Exception as e:
    print("mastodon_posts:", e)

try:
    display(pd.read_sql("SELECT * FROM streamed_toot_counts ORDER BY window_end DESC LIMIT 10", engine))
    display(pd.read_sql("SELECT * FROM avg_toot_length_by_user ORDER BY batch_id DESC, username LIMIT 10", engine))
except Exception as e:
    print("aggs:", e)

Unnamed: 0,id,username,content,ts
0,5,Potter,hello Sirius,2025-10-07 10:02:40.046819+00:00
1,4,demo,new toot from kafka,2025-10-07 09:59:53.613210+00:00
2,3,leo,another toot,2025-10-07 09:59:37.772464+00:00
3,2,demo,stream to postgres,2025-10-07 09:58:08.418246+00:00
4,1,leo,hello from notebook,2025-10-07 09:58:05.265591+00:00


Unnamed: 0,batch_id,window_start,window_end,cnt


Unnamed: 0,batch_id,username,avg_length


In [9]:
# 1) Le stream tourne ?
query.status


{'message': 'Terminated with exception: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [10]:
# 2) Il a lu des lignes ?
query.lastProgress  # regarde numInputRows