# üß™ Great Expectations avec Spark

In [None]:
print("üîß D√©marrage de Spark avec Iceberg...")
from pyspark.sql import SparkSession
import os
from datetime import datetime, timedelta
import boto3

# üîê Configuration MinIO
MINIO_ENDPOINT = os.getenv("S3_ENDPOINT_URL", "http://minio:9000")
AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID", "minio")
AWS_SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", "minio123")
BUCKET_RAW = "retail-raw"


spark = (
    SparkSession.builder
    .appName("RetailGX")
    .getOrCreate()
)

print("‚úÖ Spark + Iceberg pr√™t.")

# üìú Cr√©er suite de validation

In [None]:
# üîç Validation Great Expectations
print("üîç Configuration Great Expectations (Spark engine)...")

import great_expectations as gx
from great_expectations.core.expectation_suite import ExpectationSuite
from datetime import datetime
import os

context = gx.get_context()
datasource = context.data_sources.add_spark(name="iceberg")

# --- 1. Stores ---
stores_suite = ExpectationSuite(name="stores_suite")
stores_suite.add_expectation(gx.expectations.ExpectColumnToExist(column="store_id"))
stores_suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="store_id"))
stores_suite.add_expectation(gx.expectations.ExpectColumnValuesToBeUnique(column="store_id"))
stores_suite.add_expectation(gx.expectations.ExpectColumnToExist(column="country"))
stores_suite.add_expectation(gx.expectations.ExpectColumnValuesToBeInSet(
    column="country", 
    value_set=["France"]  # ajuste si besoin
))
stores_suite.add_expectation(gx.expectations.ExpectColumnToExist(column="opening_date"))
stores_suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="opening_date"))
context.suites.add(stores_suite)

# --- 2. Products ---
products_suite = ExpectationSuite(name="products_suite")
products_suite.add_expectation(gx.expectations.ExpectColumnToExist(column="product_id"))
products_suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="product_id"))
products_suite.add_expectation(gx.expectations.ExpectColumnValuesToBeUnique(column="product_id"))
products_suite.add_expectation(gx.expectations.ExpectColumnToExist(column="list_price"))
products_suite.add_expectation(gx.expectations.ExpectColumnValuesToBeBetween(
    column="list_price", 
    min_value=0.01, 
    max_value=100000.0
))
context.suites.add(products_suite)

# --- 3. Sales ---
sales_suite = ExpectationSuite(name="sales_suite")
sales_suite.add_expectation(gx.expectations.ExpectColumnToExist(column="sale_id"))
sales_suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="sale_id"))
sales_suite.add_expectation(gx.expectations.ExpectColumnValuesToBeUnique(column="sale_id"))
sales_suite.add_expectation(gx.expectations.ExpectColumnToExist(column="quantity"))
sales_suite.add_expectation(gx.expectations.ExpectColumnValuesToBeBetween(
    column="quantity", 
    min_value=1, 
    max_value=10000
))
sales_suite.add_expectation(gx.expectations.ExpectColumnToExist(column="sale_date"))
sales_suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="sale_date"))
context.suites.add(sales_suite)

# --- 4. Employees ---
employees_suite = ExpectationSuite(name="employees_suite")
employees_suite.add_expectation(gx.expectations.ExpectColumnToExist(column="employee_id"))
employees_suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="employee_id"))
employees_suite.add_expectation(gx.expectations.ExpectColumnValuesToBeUnique(column="employee_id"))
employees_suite.add_expectation(gx.expectations.ExpectColumnToExist(column="store_id"))
employees_suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="store_id"))
employees_suite.add_expectation(gx.expectations.ExpectColumnToExist(column="job_title"))
employees_suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="job_title"))
context.suites.add(employees_suite)

suites = {
    "stores": stores_suite,
    "products": products_suite,
    "sales": sales_suite,
    "employees": employees_suite,
}

print("‚úÖ Suites cr√©√©es")

# Boucle pour g√©n√©rer la validation

In [None]:

# --- 2. Validation ---
tables = {
    "stores": "retail.raw.stores",
    "products": "retail.raw.products",
    "sales": "retail.raw.sales",
    "employees": "retail.raw.employees",
}

validation_results = []
all_success = True
for name, table_ref in tables.items():
    print(f"\n‚û°Ô∏è  Validation de '{name}'...")
    try:
        df = spark.sql(f"SELECT * FROM {table_ref}")
        print(f"   üìä {df.count()} lignes.")
        
        # ‚úÖ Cr√©ation simple
        asset = datasource.add_dataframe_asset(name=name)
        batch_def = asset.add_batch_definition_whole_dataframe("default")
        batch = batch_def.get_batch(batch_parameters={"dataframe": df})
        
        # ‚úÖ Validator
        validator = context.get_validator(
            batch=batch, 
            expectation_suite=suites[name]
        )
        
        result = validator.validate()
        
        validation_results.append({"table": name, "result": result, "success": result.success})
        status = "‚úÖ OK" if result.success else "‚ùå √âchec"
        print(f"   {status} ({len(result.results)} expectations)")
        all_success &= result.success

    except Exception as e:
        print(f"   ‚ö†Ô∏è Erreur : {e}")
        import traceback
        traceback.print_exc()  # utile pour debug pr√©cis
        validation_results.append({"table": name, "error": str(e), "success": False})
        all_success = False

# --- 3. Rapport simple (sans gx.util) ---

In [None]:
report_path = "/tmp/gx_validation_simple.txt"
with open(report_path, "w") as f:
    f.write(f"Validation Iceberg ‚Äî {datetime.now()}\n")
    f.write("="*50 + "\n")
    for vr in validation_results:
        status = "OK" if vr["success"] else ("ERROR" if "error" in vr else "FAIL")
        f.write(f"{vr['table']:<12} : {status}\n")
        if not vr["success"]:
            f.write(f"  ‚Üí {vr.get('error', 'see HTML')}\n")
    f.write("="*50 + "\n")
    f.write(f"R√©sultat global : {'SUCCESS' if all_success else 'FAILURE'}\n")

print(f"\n‚úÖ Rapport texte sauvegard√© : {report_path}")

# R√©sum√©
print("\nüìã R√âSUM√â")
for vr in validation_results:
    print(f" ‚Ä¢ {vr['table']:<12} ‚Üí {'‚úÖ' if vr['success'] else '‚ùå'}")
print(f"\n{'üéâ TOUT OK' if all_success else '‚ö†Ô∏è Probl√®mes d√©tect√©s'}")

# üì§ Export m√©triques qualit√© vers Iceberg (pour Superset) Part 1

In [None]:
# üßæ Sauvegarde des r√©sultats de validation dans Iceberg
print("\nüíæ Sauvegarde des r√©sultats de validation dans Iceberg...")

from datetime import datetime, timezone
import uuid

# --- 1. Pr√©parer les donn√©es ---
results_data = []
run_id = str(uuid.uuid4())
run_at = datetime.now(timezone.utc)

for vr in validation_results:
    table_name = vr["table"]
    success = vr["success"]
    
    if "error" in vr:
        total = 0
        failed = 0
        error_msg = vr["error"]
    else:
        res = vr["result"]
        total = len(res.results)
        failed = sum(1 for r in res.results if not r.success)
        error_msg = None
    
    results_data.append((
        run_id,
        table_name,
        success,
        run_at,
        total,
        failed,
        error_msg
    ))

# --- 2. Cr√©er DataFrame Spark ---
from pyspark.sql.types import (
    StructType, StructField, StringType, BooleanType, 
    IntegerType, TimestampType
)

schema = StructType([
    StructField("run_id", StringType(), False),
    StructField("table_name", StringType(), False),
    StructField("success", BooleanType(), False),
    StructField("run_at", TimestampType(), False),
    StructField("total_expectations", IntegerType(), False),
    StructField("failed_expectations", IntegerType(), False),
    StructField("error_message", StringType(), True),
])

results_df = spark.createDataFrame(results_data, schema)

# --- 3. Cr√©er le namespace si besoin ---
spark.sql("CREATE NAMESPACE IF NOT EXISTS retail.quality")

# --- 4. √âcrire dans Iceberg ---
try:
    # Si la table n'existe pas ‚Üí CREATE
    results_df.writeTo("retail.quality.gx_validation_results") \
              .using("iceberg") \
              .createOrReplace()
    print("‚úÖ Table 'retail.quality.gx_validation_results' cr√©√©e et remplie.")
except Exception as e:
    # Si d√©j√† existante ‚Üí APPEND
    try:
        results_df.writeTo("retail.quality.gx_validation_results") \
                  .using("iceberg") \
                  .append()
        print("‚úÖ R√©sultats ajout√©s √† 'retail.quality.gx_validation_results'.")
    except Exception as append_err:
        print(f"‚ùå √âchec √©criture Iceberg : {append_err}")
        raise

# --- 5. V√©rification rapide ---
print("\nüëÄ Aper√ßu des r√©sultats sauvegard√©s :")
spark.table("retail.quality.gx_validation_results").show(truncate=False)

# üßæ Sauvegarde D√âTAILL√âE des r√©sultats par expectation

In [None]:
# üßæ Sauvegarde D√âTAILL√âE des r√©sultats par expectation
print("\nüíæ Sauvegarde d√©taill√©e des r√©sultats (par expectation) dans Iceberg...")

from datetime import datetime, timezone
import uuid
import json
from pyspark.sql.types import (
    StructType, StructField, StringType, BooleanType,
    IntegerType, DoubleType, TimestampType
)

# --- 1. Collecter tous les r√©sultats d√©taill√©s ---
detailed_results = []
run_id = str(uuid.uuid4())
run_at = datetime.now(timezone.utc)

for vr in validation_results:
    table_name = vr["table"]
    
    if "error" in vr:
        detailed_results.append({
            "run_id": run_id,
            "table_name": table_name,
            "expectation_id": "global_error",
            "expectation_type": "global_error",
            "column_name": None,
            "kwargs": json.dumps({}),
            "success": False,
            "observed_value": None,
            "unexpected_count": 0,
            "unexpected_percent": 0.0,
            "partial_unexpected_list": json.dumps([]),
            "error_message": vr["error"][:1000],  # tronquer si tr√®s long
            "run_at": run_at
        })
        continue
    
    validation_result = vr["result"]
    for expectation_result in validation_result.results:
        config = expectation_result.expectation_config
        result = expectation_result.result or {}
        
        # üîë R√©cup√©ration robuste du type
        exp_type = (
            getattr(config, "type", None)
            or getattr(config, "expectation_type", None)
            or str(type(config)).split(".")[-1].rstrip("'>")
        )
        
        # üîë R√©cup√©ration robuste des kwargs
        kwargs = getattr(config, "kwargs", None) or {}
        if not isinstance(kwargs, dict):
            kwargs = {}
        
        column = kwargs.get("column")  # toujours pr√©sent dans les expectations classiques
        
        # Nettoyer kwargs pour JSON
        clean_kwargs = {}
        for k, v in kwargs.items():
            if isinstance(v, (str, int, float, bool, type(None))):
                clean_kwargs[k] = v
            elif isinstance(v, (list, tuple)):
                clean_kwargs[k] = [x for x in v if isinstance(x, (str, int, float, bool))]
            # skip les objets complexes (fonctions, DataFrames, etc.)
        
        # Valeurs observ√©es
        observed_value = result.get("observed_value")
        if observed_value is not None and not isinstance(observed_value, (str, int, float, bool)):
            try:
                observed_value = json.dumps(observed_value, default=str)
            except:
                observed_value = str(observed_value)[:200]
        
        # Liste partielle des inattendus
        partial_unexpected = result.get("partial_unexpected_list", [])
        try:
            partial_unexpected_str = json.dumps(
                [str(x)[:100] for x in partial_unexpected[:10]], 
                default=str
            )
        except Exception:
            partial_unexpected_str = "[]"
        
        detailed_results.append({
            "run_id": run_id,
            "table_name": table_name,
            "expectation_id": f"{exp_type}_{column or 'global'}_{abs(hash(json.dumps(clean_kwargs, sort_keys=True))) % 10**6}",
            "expectation_type": str(exp_type),
            "column_name": str(column) if column is not None else None,
            "kwargs": json.dumps(clean_kwargs, default=str),
            "success": bool(expectation_result.success),
            "observed_value": str(observed_value) if observed_value is not None else None,
            "unexpected_count": int(result.get("unexpected_count", 0)),
            "unexpected_percent": float(result.get("unexpected_percent", 0.0)),
            "partial_unexpected_list": partial_unexpected_str,
            "error_message": None,
            "run_at": run_at
        })

# --- 2. Cr√©er DataFrame ---
schema = StructType([
    StructField("run_id", StringType(), False),
    StructField("table_name", StringType(), False),
    StructField("expectation_id", StringType(), False),
    StructField("expectation_type", StringType(), False),
    StructField("column_name", StringType(), True),
    StructField("kwargs", StringType(), True),
    StructField("success", BooleanType(), False),
    StructField("observed_value", StringType(), True),
    StructField("unexpected_count", IntegerType(), True),
    StructField("unexpected_percent", DoubleType(), True),
    StructField("partial_unexpected_list", StringType(), True),
    StructField("error_message", StringType(), True),
    StructField("run_at", TimestampType(), False),
])

detailed_df = spark.createDataFrame(detailed_results, schema)

# --- 3. Cr√©er namespace + table ---
spark.sql("CREATE NAMESPACE IF NOT EXISTS retail.quality")

# Cr√©er ou ins√©rer
try:
    detailed_df.writeTo("retail.quality.gx_expectation_results") \
               .using("iceberg") \
               .tableProperty("write.format.default", "parquet") \
               .tableProperty("history.expire.max-snapshot-age-ms", "604800000") \
               .createOrReplace()
    print("‚úÖ Table 'retail.quality.gx_expectation_results' cr√©√©e.")
except Exception as e:
    if "already exists" in str(e).lower():
        detailed_df.writeTo("retail.quality.gx_expectation_results") \
                   .using("iceberg") \
                   .append()
        print("‚úÖ R√©sultats ajout√©s √† 'retail.quality.gx_expectation_results'.")
    else:
        raise

# --- 4. V√©rification ---
print("\nüîç Exemple de r√©sultats d√©taill√©s (5 premi√®res expectations) :")
spark.sql("SELECT * FROM retail.quality.gx_expectation_results") \
     .select("*") \
     .show(5, truncate=False)

# üì¶ Sauvegarde JSON compl√®te et robuste de l'ex√©cution

In [None]:
# üì§ Sauvegarde JSON compl√®te dans S3 via boto3 (1 seul fichier .json)
print("\nüì§ Sauvegarde JSON dans S3 avec boto3...")

import json
import boto3
from botocore.exceptions import ClientError

# --- 1. G√©n√©rer le rapport d√©taill√© (robuste) ---
run_summary = {
    "run_id": run_id,
    "run_at": run_at.isoformat(),
    "spark_version": spark.version,
    "gx_version": getattr(gx, '__version__', 'unknown'),
    "tables": []
}

for vr in validation_results:
    table_entry = {
        "name": vr["table"],
        "success": vr["success"],
        "expectations": []
    }
    
    if "error" in vr:
        table_entry["error"] = str(vr["error"])[:2000]
    elif vr.get("result"):
        for er in (vr["result"].results or []):
            config = er.expectation_config
            exp_type = (
                getattr(config, "type", None)
                or getattr(config, "expectation_type", None)
                or "unknown"
            )
            kwargs = getattr(config, "kwargs", {}) or {}
            column = kwargs.get("column")
            result = er.result or {}
            
            # Valeurs safe pour JSON
            unexpected_count = result.get("unexpected_count", 0)
            observed_value = result.get("observed_value")
            if observed_value is not None and not isinstance(observed_value, (str, int, float, bool, type(None))):
                observed_value = str(observed_value)[:500]
            
            table_entry["expectations"].append({
                "type": str(exp_type),
                "column": str(column) if column is not None else None,
                "success": bool(er.success),
                "unexpected_count": int(unexpected_count),
                "observed_value": observed_value,
                "kwargs_summary": {
                    k: v for k, v in kwargs.items()
                    if k in ["column", "min_value", "max_value", "value_set", "regex", "mostly"]
                    and isinstance(v, (str, int, float, bool, list))
                }
            })
    
    run_summary["tables"].append(table_entry)

# --- 2. S√©rialiser en JSON ---
try:
    json_bytes = json.dumps(run_summary, indent=2, ensure_ascii=False, default=str).encode('utf-8')
    print(f"‚úÖ JSON g√©n√©r√© ({len(json_bytes)} octets)")
except Exception as e:
    print(f"‚ö†Ô∏è Erreur s√©rialisation JSON : {e}")
    json_bytes = json.dumps({
        "run_id": run_id,
        "run_at": run_at.isoformat(),
        "error": f"JSON serialization failed: {str(e)}",
        "summary": [{"table": vr["table"], "success": vr["success"]} for vr in validation_results]
    }, default=str).encode('utf-8')

# --- 3. √âcrire dans S3 avec boto3 ---
s3_key = f"gx_runs/{run_id}.json"
MINIO_ENDPOINT = os.getenv("S3_ENDPOINT_URL", "http://minio:9000")
AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID", "minio")
AWS_SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", "minio123")
BUCKET_RAW = "retail-raw"

s3_client = boto3.client(
    "s3",
    endpoint_url=MINIO_ENDPOINT,
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
)
try:
    
    s3_client.put_object(
        Bucket=BUCKET_RAW,
        Key=s3_key,
        Body=json_bytes,
        ContentType='application/json',
        Metadata={
            'run_id': run_id,
            'source': 'great_expectations',
            'table_count': str(len(validation_results))
        }
    )
    s3_uri = f"s3a://{BUCKET_RAW}/{s3_key}"
    print(f"‚úÖ JSON sauvegard√© dans S3 : {s3_uri}")
    
    # Optionnel : v√©rifier l'existence
    head = s3_client.head_object(Bucket=BUCKET_RAW, Key=s3_key)
    print(f"   ‚ÑπÔ∏è Taille : {head['ContentLength']} octets | ETag : {head['ETag'].strip('\"')}")

except ClientError as e:
    error_code = e.response['Error']['Code']
    print(f"‚ùå Erreur S3 ({error_code}) : {e.response['Error']['Message']}")
    # Fallback : sauvegarde locale si besoin
    fallback_path = f"/tmp/{run_id}.json"
    try:
        with open(fallback_path, 'wb') as f:
            f.write(json_bytes)
        print(f"‚ö†Ô∏è Fallback local : {fallback_path}")
    except Exception as local_err:
        print(f"‚ùå √âchec fallback local : {local_err}")

except Exception as e:
    print(f"‚ùå Erreur inattendue S3 : {e}")

# üéâ Fin

In [None]:
spark.stop()
print("\nüéâ Pipeline termin√© !")
print(f"‚û°Ô∏è  Donn√©es brutes : s3://{BUCKET_RAW}/")
print(f"‚û°Ô∏è  Tables Iceberg : retail.raw.*")
print(f"‚û°Ô∏è  M√©triques qualit√© : retail.raw.data_quality_metrics")