In [0]:
# Ajouter le repo au Python Path
import sys
sys.path.append("/Workspace/Users/mandu543@gmail.com/databricks-movies-analytics/Movies_Project")

from src.config import BRONZE_TABLE, SILVER_TABLE
from src.transformations import clean_tmdb
from pyspark.sql.functions import col, concat_ws
from delta.tables import DeltaTable

# Create schema if not exists
spark.sql(
    f"CREATE SCHEMA IF NOT EXISTS {SILVER_TABLE.split('.')[1]}"
)

### Intégration dans la table silver

In [0]:
# Lire les données Bronze
df_bronze = spark.table(BRONZE_TABLE)

# Nettoyage
df_clean = clean_tmdb(df_bronze)

# Vérifier si la colonne 'id' existe
if 'id' not in df_clean.columns:
    # Créer un ID unique basé sur title + release_date
    df_clean = df_clean.withColumn("id", concat_ws("_", col("title"), col("release_date").cast("string")))
    print("⚠️ Colonne 'id' inexistante. Création de 'mon_id' unique automatiquement.")

# Merge incrémental dans Silver
try:
    delta_table = DeltaTable.forName(spark, SILVER_TABLE)

    # Compter lignes avant merge
    count_before = delta_table.toDF().count()

    delta_table.alias("silver").merge(
        df_clean.alias("bronze"),
        "silver.id = bronze.id"
    ).whenNotMatchedInsertAll().execute()

    print("✅ Merge incrémental terminé dans Silver")
    # Compter lignes après merge
    count_after = delta_table.toDF().count()
    print(f"Lignes insérées : {count_after - count_before}")
except:
    # Si la table n'existe pas encore, on la crée
    df_clean.write.format("delta").mode("overwrite").saveAsTable(SILVER_TABLE)
    print("✅ Table Silver créée pour la première fois")