# Consumer Spark Streaming → Bronze Layer

Lit le flux Kafka en temps réel et écrit les données brutes dans Delta Lake (couche Bronze)

## Section 1 : Configuration Spark

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

# Configuration
# TODO : A remplacer par vos propres identifiants Garage
KAFKA_SERVERS = "kafka:9092"
GARAGE_ENDPOINT = "http://garage:3900"
GARAGE_ACCESS_KEY = "GK907b22f51dc0d0c5164474f2"
GARAGE_SECRET_KEY = "6cf587853042d92d2cf6bb85b7c46a6a2400a47822e9baae32f9be0b7c5c9663"

# Spark config avec cluster (inspiré TP8)
conf = SparkConf() \
    .setAppName('KafkaToBronze') \
    .setMaster('spark://spark:7077') \
    .set("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262,org.apache.spark:spark-hadoop-cloud_2.12:3.5.3,io.delta:delta-spark_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3") \
    .set("spark.hadoop.fs.s3a.committer.name", "filesystem") \
    .set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
    .set("spark.hadoop.fs.s3a.multiobjectdelete.enable", "false") \
    .set("spark.sql.shuffle.partitions", "10") \
    .set("spark.default.parallelism", "10") \
    .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

sc = SparkContext.getOrCreate(conf=conf)
sql_context = SQLContext(sc)

# Config S3/Garage
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", GARAGE_ENDPOINT)
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", GARAGE_ACCESS_KEY)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", GARAGE_SECRET_KEY)
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint.region", "garage")
sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")

print("Spark session créée")

## Section 2 : Lecture du stream Kafka

In [None]:
# Lecture stream Kafka - Stories
stories_stream = sql_context.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_SERVERS) \
    .option("subscribe", "hn-stories") \
    .option("startingOffsets", "earliest") \
    .load()

# Lecture stream Kafka - Comments
comments_stream = sql_context.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_SERVERS) \
    .option("subscribe", "hn-comments") \
    .option("startingOffsets", "earliest") \
    .load()

print("Streams Kafka connectés")

## Section 3 : Parsing des données

In [None]:
from pyspark.sql.functions import from_json, col, from_unixtime, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, ArrayType, BooleanType

# Schéma Stories
story_schema = StructType([
    StructField("id", IntegerType()),
    StructField("type", StringType()),
    StructField("by", StringType()),
    StructField("time", LongType()),
    StructField("title", StringType()),
    StructField("url", StringType()),
    StructField("text", StringType()),
    StructField("score", IntegerType()),
    StructField("descendants", IntegerType()),
    StructField("kids", ArrayType(IntegerType()))
])

# Schéma Comments
comment_schema = StructType([
    StructField("id", IntegerType()),
    StructField("type", StringType()),
    StructField("by", StringType()),
    StructField("time", LongType()),
    StructField("text", StringType()),
    StructField("parent", IntegerType()),
    StructField("kids", ArrayType(IntegerType())),
    StructField("deleted", BooleanType()),
    StructField("dead", BooleanType())
])

In [None]:
# Parser les stories
stories_parsed = stories_stream \
    .select(from_json(col("value").cast("string"), story_schema).alias("data")) \
    .select("data.*") \
    .withColumn("timestamp", from_unixtime(col("time"))) \
    .withColumn("_ingested_at", current_timestamp())

# Parser les comments
comments_parsed = comments_stream \
    .select(from_json(col("value").cast("string"), comment_schema).alias("data")) \
    .select("data.*") \
    .withColumn("timestamp", from_unixtime(col("time"))) \
    .withColumn("_ingested_at", current_timestamp())

print("Données parsées")

## Section 4 : Écriture vers Bronze (Append Only)

In [None]:
BRONZE_PATH = "s3a://bronze/hackernews"
CHECKPOINT_PATH = "s3a://bronze/checkpoints"

def append_to_bronze(batch_df, batch_id, table_path):
    """Append vers Bronze - pas de déduplication, écriture rapide"""
    if batch_df.isEmpty():
        return
    batch_df.write.format("delta").mode("append").save(table_path)

In [None]:
# Stream Stories → Bronze (Append)
stories_query = stories_parsed.writeStream \
    .foreachBatch(lambda df, id: append_to_bronze(df, id, f"{BRONZE_PATH}/stories")) \
    .option("checkpointLocation", f"{CHECKPOINT_PATH}/stories") \
    .start()

In [None]:
# Stream Comments → Bronze (Append)
comments_query = comments_parsed.writeStream \
    .foreachBatch(lambda df, id: append_to_bronze(df, id, f"{BRONZE_PATH}/comments")) \
    .option("checkpointLocation", f"{CHECKPOINT_PATH}/comments") \
    .start()

## Section 5 : Monitoring du stream

In [None]:
import time

# Afficher les stats toutes les 10 secondes
try:
    while True:
        print(f"\n--- Stats ---")
        print(f"Stories: {stories_query.status}")
        print(f"Comments: {comments_query.status}")
        time.sleep(10)
except KeyboardInterrupt:
    print("\nArrêt des streams...")
    stories_query.stop()
    comments_query.stop()

## Vérification des données

In [None]:
# Vérifier les données Bronze
sql_context.read.format("delta").load(f"{BRONZE_PATH}/stories").show(5, truncate=False)

In [None]:
sql_context.read.format("delta").load(f"{BRONZE_PATH}/comments").show(5, truncate=False)

In [None]:
sc.stop()