# üèÜ Silver to Gold - Risk, Compliance & Audit

Ce notebook cr√©e les tables Gold (agr√©gations) √† partir des tables Silver.

## √âtapes :
1. Charger les tables Silver depuis le Lakehouse
2. Cr√©er les agr√©gations m√©tier (m√©triques de conformit√©)
3. √âcrire les tables Gold dans le Lakehouse

In [None]:
# Cell 1: Load Silver tables
print("üì• Chargement des tables Silver...")

controls_silver = spark.table("controls")
executions_silver = spark.table("control_executions")
incidents_silver = spark.table("incidents")
remediation_silver = spark.table("remediation_actions")
vendors_silver = spark.table("vendors")

print(f"‚úÖ Controls: {controls_silver.count()} lignes")
print(f"‚úÖ Executions: {executions_silver.count()} lignes")
print(f"‚úÖ Incidents: {incidents_silver.count()} lignes")
print(f"‚úÖ Remediation: {remediation_silver.count()} lignes")
print(f"‚úÖ Vendors: {vendors_silver.count()} lignes")

In [None]:
# Cell 2: Create Gold - Compliance Metrics by Framework
from pyspark.sql.functions import col, count, avg, sum as spark_sum, round as spark_round

print("üèÜ Cr√©ation de la table Gold - M√©triques par Framework...")

framework_metrics = controls_silver.alias("c") \
    .join(executions_silver.alias("e"), col("c.control_id") == col("e.control_id"), "left") \
    .groupBy("c.framework") \
    .agg(
        count("c.control_id").alias("total_controls"),
        count("e.execution_id").alias("total_executions"),
        spark_round(
            (spark_sum((col("e.status") == "passed").cast("int")) / count("e.execution_id") * 100), 
            2
        ).alias("compliance_rate")
    )

framework_metrics.write.mode("overwrite").format("delta").saveAsTable("gold_framework_metrics")

print("‚úÖ Table 'gold_framework_metrics' cr√©√©e")
framework_metrics.show()

In [None]:
# Cell 3: Create Gold - Incident Metrics by Type and Severity
print("üèÜ Cr√©ation de la table Gold - M√©triques Incidents...")

incident_metrics = incidents_silver \
    .groupBy("incident_type", "severity") \
    .agg(
        count("incident_id").alias("incident_count")
    ) \
    .orderBy(col("incident_count").desc())

incident_metrics.write.mode("overwrite").format("delta").saveAsTable("gold_incident_metrics")

print("‚úÖ Table 'gold_incident_metrics' cr√©√©e")
incident_metrics.show(20)

In [None]:
# Cell 4: Create Gold - Vendor Risk Analysis
print("üèÜ Cr√©ation de la table Gold - Analyse Risque Vendors...")

vendor_risk = vendors_silver.alias("v") \
    .join(incidents_silver.alias("i"), col("v.vendor_id") == col("i.vendor_id"), "left") \
    .groupBy("v.vendor_id", "v.vendor_name", "v.service_type", "v.risk_score", "v.compliance_status") \
    .agg(
        count("i.incident_id").alias("incident_count")
    ) \
    .orderBy(col("v.risk_score").desc())

vendor_risk.write.mode("overwrite").format("delta").saveAsTable("gold_vendor_risk")

print("‚úÖ Table 'gold_vendor_risk' cr√©√©e")
vendor_risk.show(10)

In [None]:
# Cell 5: Create Gold - Remediation Performance Metrics
from pyspark.sql.functions import datediff, when

print("üèÜ Cr√©ation de la table Gold - Performance des Actions Correctives...")

remediation_performance = remediation_silver \
    .withColumn(
        "days_to_complete",
        when(col("completion_date").isNotNull(), 
             datediff(col("completion_date"), col("start_date"))
        ).otherwise(None)
    ) \
    .withColumn(
        "on_time",
        when((col("completion_date").isNotNull()) & 
             (col("completion_date") <= col("target_completion_date")), 
             "Yes"
        ).otherwise("No")
    )

remediation_metrics = remediation_performance \
    .groupBy("status") \
    .agg(
        count("remediation_id").alias("action_count"),
        spark_round(avg("days_to_complete"), 2).alias("avg_days_to_complete"),
        spark_round(avg("cost_usd"), 2).alias("avg_cost_usd")
    )

remediation_metrics.write.mode("overwrite").format("delta").saveAsTable("gold_remediation_metrics")

print("‚úÖ Table 'gold_remediation_metrics' cr√©√©e")
remediation_metrics.show()

In [None]:
# Cell 6: Summary - Liste des tables Gold cr√©√©es
print("\n" + "="*60)
print("üéâ R√âSUM√â - TABLES GOLD CR√â√âES")
print("="*60)

gold_tables = [
    "gold_framework_metrics",
    "gold_incident_metrics",
    "gold_vendor_risk",
    "gold_remediation_metrics"
]

for table_name in gold_tables:
    count = spark.table(table_name).count()
    print(f"‚úÖ {table_name}: {count} lignes")

print("\nüìä Les tables Gold sont pr√™tes pour le Semantic Model!")
print("üìå Prochaine √©tape: Cr√©er le Semantic Model dans Fabric")