In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, when
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
import logging
import shutil
import subprocess
import os

In [None]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3,io.delta:delta-core_2.12:2.4.0 pyspark-shell'

In [None]:
# Configurer le logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("KafkaToHDFSUpdater")

# Initialiser la session Spark
spark = SparkSession.builder \
    .appName("KafkaToHDFSUpdater") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

In [None]:
# Schéma du fichier CSV
csv_schema = StructType([
    StructField("OPE ID", StringType(), True),
    StructField("SchoolName", StringType(), True),
    StructField("State", StringType(), True),
    StructField("Zip Code", StringType(), True),
    StructField("School Type", StringType(), True),
    StructField("ffel_subsidized_recipients", IntegerType(), True),
    StructField("ffel_subsidized_number_of_loans_originated", IntegerType(), True),
    StructField("ffel_subsidized_amount_of_loans_originated", DoubleType(), True),
    StructField("ffel_subsidized_number_of_disbursements", IntegerType(), True),
    StructField("ffel_subsidized_amount_of_disbursements", DoubleType(), True),
    StructField("ffel_unsubsidized_recipients", IntegerType(), True),
    StructField("ffel_unsubsidized_number_of_loans_originated", IntegerType(), True),
    StructField("ffel_unsubsidized_amount_of_loans_originated", DoubleType(), True),
    StructField("ffel_unsubsidized_number_of_disbursements", IntegerType(), True),
    StructField("ffel_unsubsidized_amount_of_disbursements", DoubleType(), True),
    StructField("ffel_stafford_recipients", IntegerType(), True),
    StructField("ffel_stafford_number_of_loans_originated", IntegerType(), True),
    StructField("ffel_stafford_amount_of_loans_originated", DoubleType(), True),
    StructField("ffel_stafford_number_of_disbursements", IntegerType(), True),
    StructField("ffel_stafford_amount_of_disbursements", DoubleType(), True),
    StructField("ffel_plus_recipients", IntegerType(), True),
    StructField("ffel_plus_number_of_loans_originated", IntegerType(), True),
    StructField("ffel_plus_amount_of_loans_originated", DoubleType(), True),
    StructField("ffel_plus_number_of_disbursements", IntegerType(), True),
    StructField("ffel_plus_amount_of_disbursements", DoubleType(), True),
    StructField("Quarter_Start", StringType(), True),  # Utiliser StringType ou DateType selon le format
    StructField("Quarter_End", StringType(), True),    # Utiliser StringType ou DateType selon le format
    StructField("Timestamp", TimestampType(), True)
])

# Schéma des messages Kafka
kafka_schema = StructType([
    StructField("SchoolCode", StringType(), True),
    StructField("SchoolName", StringType(), True),
    StructField("Address", StringType(), True),
    StructField("City", StringType(), True),
    StructField("StateCode", StringType(), True),
    StructField("ZipCode", IntegerType(), True),
    StructField("Country", StringType(), True),
    StructField("timestamp", DoubleType(), True)  # Timestamp en secondes depuis epoch
])


In [None]:
csv_path = "hdfs://localhost:9080/user/anthonycormeaux/data/combined/combined_data.csv"

csv_df = spark.read \
    .option("header", "true") \
    .schema(csv_schema) \
    .csv(csv_path) \
    .cache()

In [None]:
kafka_bootstrap_servers = "localhost:9092"  # Utiliser l'adresse IP directement
kafka_topic = "excel_data"

kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "latest") \
    .load()


In [None]:
parsed_kafka_df = kafka_df.selectExpr("CAST(value AS STRING) as json_str") \
    .select(from_json(col("json_str"), kafka_schema).alias("data")) \
    .select("data.*") \
    .withColumn("message_timestamp", (col("timestamp") / 1000).cast(TimestampType()))  # Convertir en Timestamp


In [None]:
def update_csv(batch_df, batch_id):
    global csv_df
    logger.info(f"Processing batch_id: {batch_id}")
    
    # Joindre le batch Kafka avec le CSV en mémoire sur 'SchoolName'
    updated_df = csv_df.alias("csv") \
        .join(batch_df.alias("kafka"), on="SchoolName", how="left") \
        .select(
            when(
                (col("kafka.SchoolCode").isNotNull()) & (col("kafka.message_timestamp") > col("csv.Timestamp")),
                col("kafka.SchoolCode")
            ).otherwise(col("csv.OPE ID")).alias("OPE ID"),
            
            col("csv.SchoolName"),
            
            when(
                (col("kafka.StateCode").isNotNull()) & (col("kafka.message_timestamp") > col("csv.Timestamp")),
                col("kafka.StateCode")
            ).otherwise(col("csv.State")).alias("State"),
            
            when(
                (col("kafka.ZipCode").isNotNull()) & (col("kafka.message_timestamp") > col("csv.Timestamp")),
                col("kafka.ZipCode").cast(StringType())
            ).otherwise(col("csv.Zip Code")).alias("Zip Code"),
            
            col("csv.School Type"),
            
            col("csv.ffel_subsidized_recipients"),
            col("csv.ffel_subsidized_number_of_loans_originated"),
            col("csv.ffel_subsidized_amount_of_loans_originated"),
            col("csv.ffel_subsidized_number_of_disbursements"),
            col("csv.ffel_subsidized_amount_of_disbursements"),
            col("csv.ffel_unsubsidized_recipients"),
            col("csv.ffel_unsubsidized_number_of_loans_originated"),
            col("csv.ffel_unsubsidized_amount_of_loans_originated"),
            col("csv.ffel_unsubsidized_number_of_disbursements"),
            col("csv.ffel_unsubsidized_amount_of_disbursements"),
            col("csv.ffel_stafford_recipients"),
            col("csv.ffel_stafford_number_of_loans_originated"),
            col("csv.ffel_stafford_amount_of_loans_originated"),
            col("csv.ffel_stafford_number_of_disbursements"),
            col("csv.ffel_stafford_amount_of_disbursements"),
            col("csv.ffel_plus_recipients"),
            col("csv.ffel_plus_number_of_loans_originated"),
            col("csv.ffel_plus_amount_of_loans_originated"),
            col("csv.ffel_plus_number_of_disbursements"),
            col("csv.ffel_plus_amount_of_disbursements"),
            
            col("csv.Quarter_Start"),
            col("csv.Quarter_End"),
            
            when(
                (col("kafka.message_timestamp").isNotNull()) & (col("kafka.message_timestamp") > col("csv.Timestamp")),
                col("kafka.message_timestamp")
            ).otherwise(col("csv.Timestamp")).alias("Timestamp")
        )
    
    # Mettre à jour le DataFrame en mémoire
    csv_df = updated_df.cache()
    
    # Afficher le nombre de lignes mises à jour
    count = updated_df.count()
    logger.info(f"Updated CSV with {count} rows")
    
    # Réduire à une seule partition pour obtenir un seul fichier
    single_partition_df = updated_df.coalesce(1)
    
    # Chemin temporaire pour l'écriture
    temp_output_path = "hdfs://localhost:9080/user/anthonycormeaux/data/temp"
    
    # Écrire le CSV mis à jour sur HDFS en mode overwrite dans le répertoire temporaire
    single_partition_df.write.mode("overwrite").option("header", "true").csv(temp_output_path)
    
    logger.info(f"Batch {batch_id} written to temporary HDFS directory")
    
    # Déplacer le fichier depuis le répertoire temporaire vers le chemin final
    # Cela implique de supprimer l'ancien fichier et de renommer le nouveau
    # Utilisez les commandes Hadoop pour cela
    
    # Obtenir le nom du fichier écrit
    hdfs_ls_output = subprocess.check_output(["hdfs", "dfs", "-ls", temp_output_path]).decode("utf-8")
    lines = hdfs_ls_output.strip().split("\n")
    if len(lines) < 2:
        logger.error("Aucun fichier CSV trouvé dans le répertoire temporaire.")
        return
    # Supposer que le fichier CSV est le second élément
    temp_csv_file = lines[1].split()[-1]
    
    final_csv_path = "hdfs://localhost:9080/user/anthonycormeaux/data/combined/combined_data.csv"
    
    # Supprimer l'ancien fichier CSV
    subprocess.call(["hdfs", "dfs", "-rm", "-r", final_csv_path])
    
    # Déplacer le nouveau fichier CSV vers le chemin final
    subprocess.call(["hdfs", "dfs", "-mv", temp_csv_file, final_csv_path])
    
    # Supprimer le répertoire temporaire
    subprocess.call(["hdfs", "dfs", "-rm", "-r", temp_output_path])
    
    logger.info(f"Batch {batch_id} moved to final HDFS CSV file")


In [None]:
checkpoint_path = "hdfs://localhost:9080/user/anthonycormeaux/data/checkpoint"

query = parsed_kafka_df.writeStream \
    .foreachBatch(update_csv) \
    .outputMode("update") \
    .trigger(processingTime='1 minute') \
    .option("checkpointLocation", checkpoint_path) \
    .start()

query.awaitTermination()


INFO:py4j.clientserver:Received command c on object id p0
INFO:KafkaToHDFSUpdater:Processing batch_id: 2
INFO:KafkaToHDFSUpdater:Updated CSV with 3702304 rows                           
INFO:KafkaToHDFSUpdater:Batch 2 written to temporary HDFS directory             
2024-11-20 21:33:15,580 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2024-11-20 21:33:17,057 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
rm: `hdfs://localhost:9080/user/anthonycormeaux/data/combined/combined_data.csv': No such file or directory
2024-11-20 21:33:18,492 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2024-11-20 21:33:20,111 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
INFO:Kafk

Deleted hdfs://localhost:9080/user/anthonycormeaux/data/temp


INFO:py4j.clientserver:Received command c on object id p0
INFO:KafkaToHDFSUpdater:Processing batch_id: 3
24/11/20 21:34:03 WARN MemoryStore: Not enough space to cache rdd_101_166 in memory! (computed 10.4 MiB so far)
24/11/20 21:34:03 WARN BlockManager: Persisting block rdd_101_166 to disk instead.
24/11/20 21:34:03 WARN MemoryStore: Not enough space to cache rdd_101_60 in memory! (computed 31.1 MiB so far)
24/11/20 21:34:03 WARN BlockManager: Persisting block rdd_101_60 to disk instead.
24/11/20 21:34:04 WARN MemoryStore: Not enough space to cache rdd_101_111 in memory! (computed 31.3 MiB so far)
24/11/20 21:34:04 WARN BlockManager: Persisting block rdd_101_111 to disk instead.
24/11/20 21:34:04 WARN MemoryStore: Not enough space to cache rdd_101_141 in memory! (computed 41.6 MiB so far)
24/11/20 21:34:04 WARN BlockManager: Persisting block rdd_101_141 to disk instead.
24/11/20 21:34:09 WARN MemoryStore: Not enough space to cache rdd_101_111 in memory! (computed 31.3 MiB so far)
24/11

Deleted hdfs://localhost:9080/user/anthonycormeaux/data/combined/combined_data.csv


2024-11-20 21:40:26,121 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2024-11-20 21:40:27,415 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
INFO:KafkaToHDFSUpdater:Batch 3 moved to final HDFS CSV file


Deleted hdfs://localhost:9080/user/anthonycormeaux/data/temp


24/11/20 21:40:28 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 60000 milliseconds, but spent 388495 milliseconds
INFO:py4j.clientserver:Received command c on object id p0
INFO:KafkaToHDFSUpdater:Processing batch_id: 4
24/11/20 21:40:30 WARN MemoryStore: Not enough space to cache rdd_101_111 in memory! (computed 52.3 MiB so far)
24/11/20 21:40:30 WARN MemoryStore: Not enough space to cache rdd_101_120 in memory! (computed 11.1 MiB so far)
24/11/20 21:40:30 WARN MemoryStore: Not enough space to cache rdd_101_141 in memory! (computed 10.4 MiB so far)
24/11/20 21:40:31 WARN MemoryStore: Not enough space to cache rdd_129_141 in memory! (computed 10.4 MiB so far)
24/11/20 21:40:31 WARN BlockManager: Persisting block rdd_129_141 to disk instead.
24/11/20 21:40:31 WARN MemoryStore: Not enough space to cache rdd_129_146 in memory! (computed 11.0 MiB so far)
24/11/20 21:40:31 WARN BlockManager: Persisting block rdd_129_146 to disk instead.
24/11/20 21:40: