prendre les donnees de data vers notre couche bronze

In [1]:
from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .appName("Copie vers Bronze") \
    .getOrCreate()



# fichier 1
df1 = spark.read.option("header", "true").csv("data/airlines.csv")
df1.write.mode("overwrite").option("header", "true").csv("Bronze/airlines")

# fichier 2
df2 = spark.read.option("header", "true").csv("data/airports.csv")
df2.write.mode("overwrite").option("header", "true").csv("Bronze/airports")


spark.stop()


recuperation du stream vers bronze

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
import logging

# 1. Configuration du logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("StreamCSVToBronze")

# 2. Création de la SparkSession avec support warehouse si besoin
spark = SparkSession.builder \
    .appName("Streaming CSV vers Bronze") \
    .config("spark.sql.shuffle.partitions", "1") \
    .getOrCreate()

# 3. Schéma explicite pour éviter le schema inference en streaming
schema = """
    YEAR INT, MONTH INT, DAY INT, DAY_OF_WEEK INT, AIRLINE STRING,
    FLIGHT_NUMBER STRING, TAIL_NUMBER STRING, ORIGIN_AIRPORT STRING,
    DESTINATION_AIRPORT STRING, SCHEDULED_DEPARTURE INT, DEPARTURE_TIME INT,
    DEPARTURE_DELAY INT, TAXI_OUT INT, WHEELS_OFF INT, SCHEDULED_TIME INT,
    ELAPSED_TIME INT, AIR_TIME INT, DISTANCE INT, WHEELS_ON INT, TAXI_IN INT,
    SCHEDULED_ARRIVAL INT, ARRIVAL_TIME INT, ARRIVAL_DELAY INT, DIVERTED INT,
    CANCELLED INT, CANCELLATION_REASON STRING, AIR_SYSTEM_DELAY INT,
    SECURITY_DELAY INT, AIRLINE_DELAY INT, LATE_AIRCRAFT_DELAY INT,
    WEATHER_DELAY INT
"""

# 4. Fonction d'écriture dans Bronze
def write_to_bronze(batch_df, batch_id):
    if batch_df.isEmpty():
        logger.info(f"[Batch {batch_id}] Aucun fichier à traiter.")
        return

    try:
        logger.info(f"[Batch {batch_id}] Écriture des données dans Bronze.")
        batch_df.write.mode("append").option("header", "true").csv("bronze/")
    except AnalysisException as e:
        logger.error(f"[Batch {batch_id}] Erreur Spark : {e}")
    except Exception as e:
        logger.error(f"[Batch {batch_id}] Erreur générale : {e}")

# 5. Lecture en streaming depuis le dossier stream/
df_stream = spark.readStream \
    .schema(schema) \
    .option("header", "true") \
    .csv("stream/")

# 6. Définition du stream avec foreachBatch pour mieux contrôler l’écriture CSV
query = df_stream.writeStream \
    .foreachBatch(write_to_bronze) \
    .option("checkpointLocation", "bronze_checkpoint/") \
    .trigger(processingTime="10 seconds").start()

# 7. Attente de la fin
query.awaitTermination()


INFO:py4j.java_gateway:Callback Server Starting
INFO:py4j.java_gateway:Socket listening on ('127.0.0.1', 56977)
INFO:py4j.clientserver:Python Server ready to receive messages
INFO:py4j.clientserver:Received command c on object id p0
INFO:StreamCSVToBronze:[Batch 0] Écriture des données dans Bronze.
INFO:py4j.clientserver:Received command c on object id p0
INFO:StreamCSVToBronze:[Batch 1] Écriture des données dans Bronze.
INFO:py4j.clientserver:Received command c on object id p0
INFO:StreamCSVToBronze:[Batch 2] Écriture des données dans Bronze.
INFO:py4j.clientserver:Received command c on object id p0
INFO:StreamCSVToBronze:[Batch 3] Écriture des données dans Bronze.
INFO:py4j.clientserver:Received command c on object id p0
INFO:StreamCSVToBronze:[Batch 4] Écriture des données dans Bronze.
INFO:py4j.clientserver:Received command c on object id p0
INFO:StreamCSVToBronze:[Batch 5] Écriture des données dans Bronze.
INFO:py4j.clientserver:Received command c on object id p0
INFO:StreamCSVToB

Unexpected exception formatting exception. Falling back to standard exception


Traceback (most recent call last):
  File "C:\Users\natha\AppData\Roaming\Python\Python311\site-packages\IPython\core\interactiveshell.py", line 3577, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "C:\Users\natha\AppData\Local\Temp\ipykernel_10168\747028877.py", line 55, in <module>
    query.awaitTermination()
  File "c:\Users\natha\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\sql\streaming\query.py", line 221, in awaitTermination
    return self._jsq.awaitTermination()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\natha\AppData\Local\Programs\Python\Python311\Lib\site-packages\py4j\java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "c:\Users\natha\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\errors\exceptions\captured.py", line 179, in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
  File "c:\Users\natha\AppData\Local\Programs\Pyth

INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clie