In [19]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("GitHubEventsPipeline") \
    .config(
        "spark.jars.packages",
        ",".join([
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0",
            "org.apache.kafka:kafka-clients:3.5.1",
            "org.apache.spark:spark-token-provider-kafka-0-10_2.12:3.5.0",
            "org.apache.commons:commons-pool2:2.11.1",
            "org.postgresql:postgresql:42.6.0"
        ])
    ) \
    .getOrCreate()


In [20]:
df = spark\
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "kafka:9092") \
      .option("subscribe", "git_logs") \
      .option("startingOffsets", "latest") \
      .load()
      

In [None]:

from pyspark.sql.functions import col, from_json, to_timestamp
from pyspark.sql.types import *

# 3. Define schema for events
schema = StructType([
    StructField("id", StringType()),
    StructField("type", StringType()),
    StructField("repo", StructType([
        StructField("name", StringType())
    ])),
    StructField("actor", StructType([
        StructField("login", StringType())
    ])),
    StructField("created_at", StringType())
])

# 4. Parse JSON and rename fields to match Postgres table
parsed = df.select(
    from_json(col("value").cast("string"), schema).alias("data")
).withColumn("created_at", to_timestamp("data.created_at")).select(
    col("data.id").alias("id"),
    col("data.type").alias("type"),
    col("data.repo.name").alias("repo_name"),
    col("data.actor.login").alias("actor_login"),
    col("created_at")
)

# 5. Function to write each microbatch to Postgres
def write_to_postgres(batch_df, batch_id):
    batch_df.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://postgres:5432/reddit_db") \
        .option("dbtable", "github_events") \
        .option("user", "reddit_user") \
        .option("password", "reddit_pass") \
        .option("driver", "org.postgresql.Driver") \
        .mode("append") \
        .save()

# 6. Write streaming DataFrame to Postgres
query = parsed.writeStream \
    .foreachBatch(write_to_postgres) \
    .outputMode("append") \
    .start()

query.awaitTermination()

