In [0]:
%sql
DROP TABLE data_engineering_study_guide.bronze.movies_bronze

In [0]:
%fs rm -r /mnt/checkpoints/bronze/

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, count

# Définir le schéma
schema = StructType([
    StructField("Unnamed: 0", StringType(), True),
    StructField("Title", StringType(), True),
    StructField("Year", IntegerType(), True),
    StructField("imdbID", StringType(), True),
    StructField("Type", StringType(), True),
    StructField("Poster", StringType(), True),
])

# Lecture en streaming
movies_bronze = (spark.readStream
    .format("csv")
    .option("header", "true")
    .schema(schema) 
    .load("gs://de-01-data-ingestion/csv/")
)
movies_bronze = movies_bronze.withColumnRenamed("Unnamed: 0", "row_id")

spark.sql("CREATE SCHEMA IF NOT EXISTS bronze")
# Écriture en table Delta Bronze
bronze_query = (movies_bronze.writeStream 
    .format("delta") 
    .option("checkpointLocation", "/mnt/checkpoints/bronze/") 
    .outputMode("append") 
    .toTable("data_engineering_study_guide.bronze.movies_bronze")
)

In [0]:
spark.streams.active

In [0]:
spark.sql("SELECT COUNT(*) FROM data_engineering_study_guide.bronze.movies_bronze").show()

In [0]:
%sql
DESCRIBE HISTORY data_engineering_study_guide.bronze.movies_bronze

In [0]:
df = spark.sql("DESCRIBE DETAIL data_engineering_study_guide.bronze.movies_bronze")
print(df.select("numFiles", "sizeInBytes").collect())


In [0]:

movies_silver = spark.readStream.table("data_engineering_study_guide.bronze.movies_bronze")
movies_silver = movies_silver.na.drop(subset=["Year", "Title"], how="any")
movies_silver = movies_silver.dropDuplicates()

display(movies_silver)

movies_silver = movies_silver.writeStream \
    .format("memory") \
    .queryName("movies_silver") \
    .outputMode("append") \
    .start()


In [0]:
movies_by_year_gold = spark.table("movies_silver")
movies_by_year_gold = movies_by_year_gold.groupBy("Year").agg(count("*").alias("Total"))

display(movies_by_year_gold)
