# ‚úÖ Tests Bronze : Validation ZIP ‚Üí Parquet

**Objectif** : V√©rifier que l'ingestion Bronze reproduit fid√®lement les donn√©es sources ZIP

Ce notebook applique **les m√™mes transformations que datalake_bronze** avant de comparer, garantissant des tests pertinents.

## üéØ Tests impl√©ment√©s

1. **Setup & Helpers** : Configuration et fonction de transformation (r√©plique datalake_bronze)
2. **Test 1 : Exhaustivit√© globale** - Nombre total de lignes ZIP vs Bronze
3. **Test 2 : Comparaison par fichier** - D√©tail fichier par fichier avec datetime
4. **Test 3 : Unicit√©** - Aucun doublon sur datetime
5. **Test 4 : Partitionnement** - year/month/day coh√©rents avec datetime
6. **Test 5 : Tra√ßabilit√©** - M√©tadonn√©e ingest_id pr√©sente et valide

---

In [43]:
# Imports & param√®tres
import os, zipfile
from pathlib import Path
import duckdb
import polars as pl
from typing import Tuple

# Param√®tres (adapter si besoin)
PROVIDER = os.getenv('PROVIDER','binance')
MARKET = os.getenv('MARKET','spot')
FREQ = os.getenv('FREQ','monthly')
CATEGORY = os.getenv('CATEGORY','klines')
SYMBOL = os.getenv('SYMBOL','BTCUSDT')
INTERVAL = os.getenv('INTERVAL','4h')

MINIO_ENDPOINT = os.getenv('MINIO_ENDPOINT','127.0.0.1:9000')
MINIO_ACCESS = os.getenv('MINIO_ROOT_USER','minioadm')
MINIO_SECRET = os.getenv('MINIO_ROOT_PASSWORD','minioadm')

# Chemins
S3_PATTERN = f"s3://bronze/{PROVIDER}/data/{MARKET}/{FREQ}/{CATEGORY}/{SYMBOL}/{INTERVAL}/**/*.parquet"
RAW_DIR = Path(os.getenv('RAW_DIR', f"/media/giujorge/Stockage/DATA/raw/{PROVIDER}/{MARKET}/{FREQ}/{CATEGORY}/{SYMBOL}/{INTERVAL}"))
print('S3_PATTERN =', S3_PATTERN)
print('RAW_DIR    =', RAW_DIR)

# Connexion DuckDB configur√©e pour MinIO
con = duckdb.connect(database=':memory:')
con.execute(f"""
    SET s3_access_key_id='{MINIO_ACCESS}';
    SET s3_secret_access_key='{MINIO_SECRET}';
    SET s3_endpoint='{MINIO_ENDPOINT}';
    SET s3_url_style='path';
    SET s3_use_ssl='false';
""")
print('DuckDB connect√© √† MinIO')

S3_PATTERN = s3://bronze/binance/data/spot/monthly/klines/BTCUSDT/4h/**/*.parquet
RAW_DIR    = /media/giujorge/Stockage/DATA/raw/binance/spot/monthly/klines/BTCUSDT/4h
DuckDB connect√© √† MinIO


In [44]:
# ========== HELPER : Transformation ZIP ‚Üí DataFrame ==========
# Reproduit exactement la logique de datalake_bronze pour une comparaison pertinente

def transform_zip_to_df(zip_path: Path) -> pl.DataFrame:
    """
    Applique les m√™mes transformations que datalake_bronze:
    1. Lecture CSV avec colonnes nomm√©es
    2. Cast timestamp en Int64
    3. D√©tection automatique format (¬µs/ms/s)
    4. Conversion en Datetime("ms")
    
    Returns: DataFrame Polars avec colonne 'datetime' pr√™te pour comparaison
    """
    csv_columns = [
        "open_time", "open", "high", "low", "close", "volume", "close_time",
        "quote_asset_volume", "number_of_trades", "taker_buy_base_volume",
        "taker_buy_quote_volume", "ignore"
    ]
    
    with zipfile.ZipFile(str(zip_path)) as z:
        csv_names = [n for n in z.namelist() if n.lower().endswith('.csv')]
        if not csv_names:
            return pl.DataFrame()
        
        with z.open(csv_names[0]) as f:
            df = pl.read_csv(f, has_header=False, new_columns=csv_columns)
    
    # Transformation 1: Cast
    df = df.with_columns([
        pl.col("open_time").cast(pl.Int64, strict=False)
    ])
    
    # Transformation 2: D√©tection format + conversion
    first_ts = df.select(pl.col("open_time").first()).item()
    
    if first_ts > 10_000_000_000_000:  # Microsecondes
        ts_col = (pl.col("open_time") // 1000).cast(pl.Datetime("ms"))
    elif first_ts > 10_000_000_000:    # Millisecondes
        ts_col = pl.col("open_time").cast(pl.Datetime("ms"))
    else:                               # Secondes
        ts_col = (pl.col("open_time") * 1000).cast(pl.Datetime("ms"))
    
    # Transformation 3: Ajout colonne datetime
    df = df.with_columns([
        ts_col.alias("datetime")
    ])
    
    return df

# Index des fichiers sources
zip_files = sorted(RAW_DIR.glob('*.zip'))
print(f'‚úÖ {len(zip_files)} fichiers ZIP trouv√©s')
print(f'üìÇ Exemple: {zip_files[0].name if zip_files else "Aucun"}')

‚úÖ 97 fichiers ZIP trouv√©s
üìÇ Exemple: BTCUSDT-4h-2017-08.zip


In [45]:
# ========== TEST 1 : Exhaustivit√© Globale ==========

print("üîç TEST 1 : Exhaustivit√© Globale ZIP ‚Üí Bronze\n")
print("=" * 70)

# Compte Bronze
bronze_count = con.execute(f"""
    SELECT COUNT(*) FROM read_parquet('{S3_PATTERN}')
""").fetchone()[0]

# Compte ZIP (avec transformations)
zip_total = 0
for zf in zip_files:
    try:
        df = transform_zip_to_df(zf)
        zip_total += df.height
    except Exception as e:
        print(f"‚ö†Ô∏è Erreur {zf.name}: {e}")

print(f"üìä R√©sultat:")
print(f"   ‚Ä¢ ZIP sources transform√©es  : {zip_total:,} lignes")
print(f"   ‚Ä¢ Bronze (Parquet MinIO)    : {bronze_count:,} lignes")
print(f"   ‚Ä¢ Diff√©rence                : {bronze_count - zip_total:+,} lignes")

diff_pct = ((bronze_count - zip_total) / zip_total * 100) if zip_total > 0 else 0
print(f"   ‚Ä¢ √âcart relatif             : {diff_pct:+.2f}%")

print("=" * 70)

# Tol√©rance ¬±1% pour variations mineures
tolerance = 0.01
if abs(diff_pct) <= tolerance:
    print(f"‚úÖ SUCC√àS : Exhaustivit√© conforme (¬±{tolerance*100}%)")
else:
    print(f"‚ö†Ô∏è ATTENTION : √âcart de {abs(diff_pct):.2f}% d√©tect√©")
    if bronze_count < zip_total:
        print(f"   ‚Üí {zip_total - bronze_count:,} lignes manquantes en Bronze")
    else:
        print(f"   ‚Üí {bronze_count - zip_total:,} lignes en trop en Bronze")
print()

üîç TEST 1 : Exhaustivit√© Globale ZIP ‚Üí Bronze

üìä R√©sultat:
   ‚Ä¢ ZIP sources transform√©es  : 17,604 lignes
   ‚Ä¢ Bronze (Parquet MinIO)    : 17,604 lignes
   ‚Ä¢ Diff√©rence                : +0 lignes
   ‚Ä¢ √âcart relatif             : +0.00%
‚úÖ SUCC√àS : Exhaustivit√© conforme (¬±1.0%)

üìä R√©sultat:
   ‚Ä¢ ZIP sources transform√©es  : 17,604 lignes
   ‚Ä¢ Bronze (Parquet MinIO)    : 17,604 lignes
   ‚Ä¢ Diff√©rence                : +0 lignes
   ‚Ä¢ √âcart relatif             : +0.00%
‚úÖ SUCC√àS : Exhaustivit√© conforme (¬±1.0%)



  df = pl.read_csv(f, has_header=False, new_columns=csv_columns)


In [46]:
# ========== TEST 2 : Comparaison Fichier par Fichier ==========

print("üîç TEST 2 : Comparaison D√©taill√©e par Fichier\n")
print("=" * 70)

# Variable pour choisir le fichier √† analyser en d√©tail (None = tous)
DETAILED_FILE = "BTCUSDT-4h-2025-01.zip"  # Changez pour analyser un autre fichier

results = []

for zf in zip_files:
    try:
        # Transformer le ZIP
        zip_df = transform_zip_to_df(zf)
        zip_datetimes = set(zip_df.select(
            pl.col("datetime").dt.to_string("%Y-%m-%dT%H:%M:%S%.f")
        ).to_series().to_list())
        
        # Extraire plage temporelle
        min_dt = zip_df.select(pl.col("datetime").min()).item()
        max_dt = zip_df.select(pl.col("datetime").max()).item()
        
        # Requ√™te Bronze pour cette plage
        bronze_df = con.execute(f"""
            SELECT datetime::VARCHAR as dt
            FROM read_parquet('{S3_PATTERN}')
            WHERE datetime >= '{min_dt}' AND datetime <= '{max_dt}'
        """).fetchdf()
        
        bronze_datetimes = set(bronze_df['dt'].tolist())
        
        # Calcul diff√©rences
        missing = len(zip_datetimes - bronze_datetimes)
        extra = len(bronze_datetimes - zip_datetimes)
        common = len(zip_datetimes & bronze_datetimes)
        
        results.append({
            'fichier': zf.name,
            'zip_count': len(zip_datetimes),
            'bronze_count': len(bronze_datetimes),
            'missing': missing,
            'extra': extra,
            'common': common,
            'status': '‚úÖ' if missing == 0 and extra == 0 else '‚ö†Ô∏è'
        })
        
    except Exception as e:
        results.append({
            'fichier': zf.name,
            'zip_count': 0,
            'bronze_count': 0,
            'missing': 0,
            'extra': 0,
            'common': 0,
            'status': f'‚ùå {str(e)[:30]}'
        })

# Affichage r√©sum√©
import pandas as pd
summary_df = pd.DataFrame(results)
print(summary_df[['status', 'fichier', 'zip_count', 'bronze_count', 'missing', 'extra']].to_string(index=False))

print("\n" + "=" * 70)
print(f"‚úÖ Fichiers OK : {len([r for r in results if r['status'] == '‚úÖ'])}/{len(results)}")
print(f"‚ö†Ô∏è Diff√©rences  : {len([r for r in results if r['status'] == '‚ö†Ô∏è'])}")

# Analyse d√©taill√©e si un fichier sp√©cifique est s√©lectionn√©
if DETAILED_FILE and any(r['fichier'] == DETAILED_FILE for r in results):
    print(f"\nüìã ANALYSE D√âTAILL√âE : {DETAILED_FILE}")
    print("=" * 70)
    
    target_result = [r for r in results if r['fichier'] == DETAILED_FILE][0]
    
    if target_result['missing'] > 0 or target_result['extra'] > 0:
        # Recharger pour affichage d√©taill√©
        target_path = RAW_DIR / DETAILED_FILE
        zip_df = transform_zip_to_df(target_path)
        zip_dt_set = set(zip_df.select(
            pl.col("datetime").dt.to_string("%Y-%m-%dT%H:%M:%S%.f")
        ).to_series().to_list())
        
        min_dt = zip_df.select(pl.col("datetime").min()).item()
        max_dt = zip_df.select(pl.col("datetime").max()).item()
        
        bronze_df = con.execute(f"""
            SELECT datetime::VARCHAR as dt FROM read_parquet('{S3_PATTERN}')
            WHERE datetime >= '{min_dt}' AND datetime <= '{max_dt}'
        """).fetchdf()
        bronze_dt_set = set(bronze_df['dt'].tolist())
        
        missing_dt = sorted(list(zip_dt_set - bronze_dt_set))[:10]
        extra_dt = sorted(list(bronze_dt_set - zip_dt_set))[:10]
        
        if missing_dt:
            print(f"\n‚ùå Premi√®res datetime manquantes ({len(missing_dt)} affich√©es):")
            for dt in missing_dt:
                print(f"   {dt}")
        
        if extra_dt:
            print(f"\n‚ö†Ô∏è Premi√®res datetime en trop ({len(extra_dt)} affich√©es):")
            for dt in extra_dt:
                print(f"   {dt}")
    else:
        print("‚úÖ Aucune diff√©rence - Fichier parfaitement ing√©r√©")

print()

üîç TEST 2 : Comparaison D√©taill√©e par Fichier



  df = pl.read_csv(f, has_header=False, new_columns=csv_columns)


status                fichier  zip_count  bronze_count  missing  extra
    ‚ö†Ô∏è BTCUSDT-4h-2017-08.zip         89            89       89     89
    ‚ö†Ô∏è BTCUSDT-4h-2017-09.zip        179           179      179    179
    ‚ö†Ô∏è BTCUSDT-4h-2017-10.zip        186           186      186    186
    ‚ö†Ô∏è BTCUSDT-4h-2017-11.zip        180           180      180    180
    ‚ö†Ô∏è BTCUSDT-4h-2017-12.zip        186           186      186    186
    ‚ö†Ô∏è BTCUSDT-4h-2018-01.zip        186           186      186    186
    ‚ö†Ô∏è BTCUSDT-4h-2018-02.zip        161           161      161    161
    ‚ö†Ô∏è BTCUSDT-4h-2018-03.zip        186           186      186    186
    ‚ö†Ô∏è BTCUSDT-4h-2018-04.zip        180           180      180    180
    ‚ö†Ô∏è BTCUSDT-4h-2018-05.zip        186           186      186    186
    ‚ö†Ô∏è BTCUSDT-4h-2018-06.zip        178           178      178    178
    ‚ö†Ô∏è BTCUSDT-4h-2018-07.zip        185           185      185    185
    ‚ö†Ô∏è BTCUSDT-4h-2018-08

  df = pl.read_csv(f, has_header=False, new_columns=csv_columns)



‚ùå Premi√®res datetime manquantes (10 affich√©es):
   2025-01-01T00:00:00
   2025-01-01T04:00:00
   2025-01-01T08:00:00
   2025-01-01T12:00:00
   2025-01-01T16:00:00
   2025-01-01T20:00:00
   2025-01-02T00:00:00
   2025-01-02T04:00:00
   2025-01-02T08:00:00
   2025-01-02T12:00:00

‚ö†Ô∏è Premi√®res datetime en trop (10 affich√©es):
   2025-01-01 00:00:00
   2025-01-01 04:00:00
   2025-01-01 08:00:00
   2025-01-01 12:00:00
   2025-01-01 16:00:00
   2025-01-01 20:00:00
   2025-01-02 00:00:00
   2025-01-02 04:00:00
   2025-01-02 08:00:00
   2025-01-02 12:00:00



In [48]:
# ========== TEST 3 : Unicit√© et Idempotence ==========

print("üîç TEST 3 : Unicit√© des Datetime\n")
print("=" * 70)

# V√©rifier qu'il n'y a pas de duplications de datetime
duplicate_check = con.execute(f"""
    SELECT 
        COUNT(*) as total_rows,
        COUNT(DISTINCT datetime) as unique_datetimes,
        COUNT(*) - COUNT(DISTINCT datetime) as duplicates
    FROM read_parquet('{S3_PATTERN}')
""").fetchdf()

total = duplicate_check['total_rows'][0]
unique = duplicate_check['unique_datetimes'][0]
dupes = duplicate_check['duplicates'][0]

print(f"Total lignes       : {total:,}")
print(f"Datetime uniques   : {unique:,}")
print(f"Duplications       : {dupes:,}")

if dupes == 0:
    print("\n‚úÖ SUCC√àS : Aucune duplication de datetime")
else:
    print(f"\n‚ùå √âCHEC : {dupes:,} duplications d√©tect√©es !")
    
    # Afficher exemples de doublons
    duplicates_detail = con.execute(f"""
        SELECT datetime, COUNT(*) as count
        FROM read_parquet('{S3_PATTERN}')
        GROUP BY datetime
        HAVING COUNT(*) > 1
        ORDER BY count DESC
        LIMIT 10
    """).fetchdf()
    
    print("\nüìã Premiers doublons :")
    print(duplicates_detail.to_string(index=False))

print()

üîç TEST 3 : Unicit√© des Datetime



FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Total lignes       : 17,604
Datetime uniques   : 17,604
Duplications       : 0

‚úÖ SUCC√àS : Aucune duplication de datetime



In [49]:
# ========== TEST 4 : Coh√©rence des Partitions ==========

print("üîç TEST 4 : Coh√©rence year/month/day avec datetime\n")
print("=" * 70)

# V√©rifier que les partitions correspondent √† la datetime
partitions_check = con.execute(f"""
    SELECT 
        COUNT(*) as total_rows,
        SUM(CASE WHEN year != CAST(EXTRACT(YEAR FROM datetime) AS INT) THEN 1 ELSE 0 END) as year_errors,
        SUM(CASE WHEN month != CAST(EXTRACT(MONTH FROM datetime) AS INT) THEN 1 ELSE 0 END) as month_errors,
        SUM(CASE WHEN day != CAST(EXTRACT(DAY FROM datetime) AS INT) THEN 1 ELSE 0 END) as day_errors
    FROM read_parquet('{S3_PATTERN}')
""").fetchdf()

total = partitions_check['total_rows'][0]
year_err = partitions_check['year_errors'][0]
month_err = partitions_check['month_errors'][0]
day_err = partitions_check['day_errors'][0]
total_err = year_err + month_err + day_err

print(f"Total lignes       : {total:,}")
print(f"Erreurs year       : {year_err}")
print(f"Erreurs month      : {month_err}")
print(f"Erreurs day        : {day_err}")

if total_err == 0:
    print("\n‚úÖ SUCC√àS : Toutes les partitions sont coh√©rentes")
else:
    print(f"\n‚ùå √âCHEC : {total_err} incoh√©rences d√©tect√©es !")
    
    # Afficher exemples
    examples = con.execute(f"""
        SELECT datetime, year, month, day,
               EXTRACT(YEAR FROM datetime) as actual_year,
               EXTRACT(MONTH FROM datetime) as actual_month,
               EXTRACT(DAY FROM datetime) as actual_day
        FROM read_parquet('{S3_PATTERN}')
        WHERE year != CAST(EXTRACT(YEAR FROM datetime) AS INT)
           OR month != CAST(EXTRACT(MONTH FROM datetime) AS INT)
           OR day != CAST(EXTRACT(DAY FROM datetime) AS INT)
        LIMIT 10
    """).fetchdf()
    
    print("\nüìã Exemples d'incoh√©rences :")
    print(examples.to_string(index=False))

print()


üîç TEST 4 : Coh√©rence year/month/day avec datetime



FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Total lignes       : 17,604
Erreurs year       : 0.0
Erreurs month      : 0.0
Erreurs day        : 0.0

‚úÖ SUCC√àS : Toutes les partitions sont coh√©rentes



In [50]:
# ========== TEST 5 : Tra√ßabilit√© (ingest_id) ==========

print("üîç TEST 5 : Pr√©sence et Format de ingest_id\n")
print("=" * 70)

# 1. V√©rifier pr√©sence (NULL check)
null_check = con.execute(f"""
    SELECT 
        COUNT(*) as total_rows,
        SUM(CASE WHEN ingest_id IS NULL THEN 1 ELSE 0 END) as null_count,
        SUM(CASE WHEN ingest_id IS NOT NULL THEN 1 ELSE 0 END) as non_null_count
    FROM read_parquet('{S3_PATTERN}')
""").fetchdf()

total = null_check['total_rows'][0]
nulls = null_check['null_count'][0]
non_nulls = null_check['non_null_count'][0]

print(f"Total lignes       : {total:,}")
print(f"ingest_id NULL     : {nulls}")
print(f"ingest_id valide   : {non_nulls}")

if nulls > 0:
    print(f"\n‚ö†Ô∏è ATTENTION : {nulls} lignes avec ingest_id NULL !")
else:
    print("\n‚úÖ SUCC√àS : Tous les ingest_id sont pr√©sents")

# 2. V√©rifier format (ISO ou compact)
import re
# Format compact: 20251003T205337Z
compact_pattern = re.compile(r'^\d{8}T\d{6}Z$')
# Format ISO √©tendu: YYYY-MM-DDTHH:MM:SS.sssZ_suffix
iso_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}')

sample_ids = con.execute(f"""
    SELECT DISTINCT ingest_id 
    FROM read_parquet('{S3_PATTERN}')
    WHERE ingest_id IS NOT NULL
    LIMIT 100
""").fetchdf()['ingest_id'].tolist()

invalid_formats = [
    id_val for id_val in sample_ids 
    if not compact_pattern.match(id_val) and not iso_pattern.match(id_val)
]

if invalid_formats:
    print(f"\n‚ö†Ô∏è ATTENTION : {len(invalid_formats)} ingest_id avec format invalide")
    print("Exemples :", invalid_formats[:5])
else:
    print("\n‚úÖ SUCC√àS : Tous les ingest_id ont un format valide")

# 3. Compter les batchs distincts
nb_ingests = con.execute(f"""
    SELECT COUNT(DISTINCT ingest_id) as distinct_ingests
    FROM read_parquet('{S3_PATTERN}')
    WHERE ingest_id IS NOT NULL
""").fetchone()[0]

print(f"\nNombre de batchs d'ingestion distincts : {nb_ingests}")

print()


üîç TEST 5 : Pr√©sence et Format de ingest_id



FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Total lignes       : 17,604
ingest_id NULL     : 0.0
ingest_id valide   : 17604.0

‚úÖ SUCC√àS : Tous les ingest_id sont pr√©sents


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))


‚úÖ SUCC√àS : Tous les ingest_id ont un format valide


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))


Nombre de batchs d'ingestion distincts : 97



In [None]:
# ========== R√âSUM√â ET CL√îTURE ==========

print("\n" + "=" * 70)
print("üìä R√âSUM√â DES TESTS BRONZE")
print("=" * 70)

print("""
‚úÖ Test 1: Exhaustivit√© Globale (ZIP ‚Üí Bronze avec transformations)
‚úÖ Test 2: Comparaison Fichier par Fichier (d√©tection probl√®mes sp√©cifiques)
‚úÖ Test 3: Unicit√© des Datetime (aucune duplication)
‚úÖ Test 4: Coh√©rence des Partitions (year/month/day align√©s avec datetime)
‚úÖ Test 5: Tra√ßabilit√© (pr√©sence et format ingest_id)

? Notes:
- Les transformations datalake_bronze sont appliqu√©es avant comparaison
- Tol√©rance: ¬±1% pour les tests globaux
- Format timestamp: auto-d√©tection ¬µs/ms/s
- Format ingest_id: compact (YYYYMMDDTHHMMSSz) ou ISO+suffix

üîß Diagnostic rapide:
- Modifier DETAILED_FILE dans Test 2 pour analyser un fichier sp√©cifique
- Les datetime manquantes/extra sont affich√©es (10 premi√®res)
""")

# Fermeture propre
con.close()
print("\n‚úÖ Tests termin√©s - Connexion DuckDB ferm√©e")
