**But de ce notebook (Étape 1.1 · HMDA) :**<br>  *1. vérifier l’API (agrégats, 2018+)*<br>
    *2. télécharger les datasets historiques (bulk/static) 2004–2017 et ceux de l'API (2018 et +)*<br>
    *3. tous les ramener en tri-states (NY, NJ, Connecticut)*

**1 Configurations**

In [None]:
# %pip install pandas requests pyarrow --quiet   # à exécuter si pandas et requests ne sont pas déjà installés

In [1]:
import os

BASE_DIR   = r"C:\Users\33669\OneDrive\Документы\data_credit_scoring"
PROJECT    = os.path.join(BASE_DIR, "tri_state_ai")
DATA_RAW   = os.path.join(PROJECT, "data_raw", "hmda")   # là où on stocke les ZIP/CSV bruts
DATA_WORK  = os.path.join(PROJECT, "data_work")          # sorties Parquet
NOTEBOOKS  = os.path.join(PROJECT, "notebooks")

for p in [DATA_RAW, DATA_WORK, NOTEBOOKS]:
    os.makedirs(p, exist_ok=True)

# Codes FIPS des États Tri-State : NY=36, NJ=34, CT=09
TRI_STATES = {"36", "34", "09"}

print("✅ Dossiers OK")
print("DATA_RAW :", DATA_RAW)
print("DATA_WORK:", DATA_WORK)

✅ Dossiers OK
DATA_RAW : C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_raw\hmda
DATA_WORK: C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_work


In [None]:
#######################################################################################################################

**2 Sanity check API (agrégats, 2018+ seulement)**<br>
*But: tester que l’API répond et que le schéma attendu est correct (GET + params).*<br>
⚠️ *L’API “data-browser” ne renvoie que des agrégats, pas les lignes brutes.*

In [3]:
# %pip install pandas requests --quiet  # à exécuter si pandas et requests ne sont pas déjà installés

Note: you may need to restart the kernel to use updated packages.


In [2]:
import requests, pandas as pd

API = "https://ffiec.cfpb.gov/v2/data-browser-api/view/aggregations"
years  = [2018, 2019, 2020, 2021, 2022]  # l’API supporte 2018+ (on reste léger ici)
states = ["NY","NJ","CT"]

rows = []
for y in years:
    for st in states:
        params = {
            "years": str(y),
            "states": st,
            "actions_taken": "1,2,3,6,7",  # 1=Approved + autres statuts utiles
        }
        r = requests.get(API, params=params, timeout=60)
        r.raise_for_status()
        for rec in r.json().get("aggregations", []):
            rec["year"] = y
            rec["state"] = st
            rows.append(rec)

df_api = pd.DataFrame(rows)
print(df_api.shape)
df_api.head()


(75, 5)


Unnamed: 0,count,sum,actions_taken,year,state
0,13830,4910830000.0,2,2018,NY
1,110166,26141730000.0,3,2018,NY
2,288,158250000.0,7,2018,NY
3,60253,16648480000.0,6,2018,NY
4,283988,112344500000.0,1,2018,NY


In [None]:
#######################################################################################################################

**3 Téléchargement des historiques 2004–2017 (bulk/static)**<br>
But: ici ont construit une base locale fiable et traçable de toutes les archives HMDA 2004–2017, prêtes pour le traitement streaming

In [3]:
# 2017 — “All records” (CFPB/FFIEC)

import os, hashlib, requests

url_2017_all = "https://files.consumerfinance.gov/hmda-historic-loan-data/hmda_2017_nationwide_all-records_labels.zip"
out_2017_zip = os.path.join(DATA_RAW, "hmda_2017_nationwide_all-records_labels.zip")

if not os.path.exists(out_2017_zip):
    print("⬇️  2017 (All records)…")
    with requests.get(url_2017_all, stream=True, timeout=600) as r:
        r.raise_for_status()
        with open(out_2017_zip, "wb") as f:
            for chunk in r.iter_content(chunk_size=1024*1024):
                if chunk: f.write(chunk)
    print("✅ téléchargé :", out_2017_zip)
else:
    print("⏭️ déjà présent :", out_2017_zip)

# Empreinte (traçabilité)
h = hashlib.sha256()
with open(out_2017_zip, "rb") as f:
    for ch in iter(lambda: f.read(1024*1024), b""):
        h.update(ch)
print("SHA256:", h.hexdigest())


⬇️  2017 (All records)…
✅ téléchargé : C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_raw\hmda\hmda_2017_nationwide_all-records_labels.zip
SHA256: f43dbc4f1a674d8ae21a3a4f0a07f0a1366063f695724a564c867498046f2984


In [4]:
# 2004–2016 — LAR “Final” via NARA (National Archives):
  # On utilise les ID NARA officiels pour construire les liens https://catalog.archives.gov/download/<ID>

import time

nara_ids = {
    2004: 5716418,  2005: 6850582,  2006: 6850584,  2007: 6852883,
    2008: 6852884,  2009: 6872010,  2010: 12008309, 2011: 12008312,
    2012: 18491490, 2013: 34618164, 2014: 100378087, 2015: 5752989,
    2016: 5752994,
}

for year, nid in nara_ids.items():
    url = f"https://catalog.archives.gov/download/{nid}"
    out = os.path.join(DATA_RAW, f"hmda_lar_{year}.zip")
    if os.path.exists(out):
        print(f"⏭️ {year} déjà présent")
        continue
    try:
        print(f"⬇️  {year} → {url}")
        with requests.get(url, stream=True, timeout=600) as r:
            r.raise_for_status()
            with open(out, "wb") as f:
                for chunk in r.iter_content(chunk_size=1024*1024):
                    if chunk: f.write(chunk)
        print("  ✅ OK:", out)
        time.sleep(1.5)
    except Exception as e:
        print(f"  ❌ {year} erreur:", e)

⬇️  2004 → https://catalog.archives.gov/download/5716418
  ✅ OK: C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_raw\hmda\hmda_lar_2004.zip
⬇️  2005 → https://catalog.archives.gov/download/6850582
  ✅ OK: C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_raw\hmda\hmda_lar_2005.zip
⬇️  2006 → https://catalog.archives.gov/download/6850584
  ✅ OK: C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_raw\hmda\hmda_lar_2006.zip
⬇️  2007 → https://catalog.archives.gov/download/6852883
  ✅ OK: C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_raw\hmda\hmda_lar_2007.zip
⬇️  2008 → https://catalog.archives.gov/download/6852884
  ✅ OK: C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_raw\hmda\hmda_lar_2008.zip
⬇️  2009 → https://catalog.archives.gov/download/6872010
  ✅ OK: C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_raw\hmda\hmda_lar_2009.zip
⬇️  2010 → https

In [None]:
#######################################################################################################################
####

**C'est dommage de constaté que tous ces dossiers contenu dans le web et téléchargés avec succès sont corompus.<br> En fait il ne sont pas réelement des .zip et finalement ils sont inexploitables.<br> Nous avous recuperer manuellement les dossiers .zip de bonne source (malgré le fait que leur contenu .excel sont bizarement très lourds en terme de Giga) puis faire une extraction et enfin les stocker en format .parquet pret pour analyses ultérieures**

In [21]:
# nous pouvons voir la liste des dossiers que nous avons télécharges en local
DATA_RAW = r"C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_raw\hmda"
[z for z in os.listdir(DATA_RAW) if z.endswith(".zip")]

['hmda_2007_nationwide_all-records_labels.zip',
 'hmda_2008_nationwide_all-records_labels.zip',
 'hmda_2009_nationwide_all-records_labels.zip',
 'hmda_2010_nationwide_all-records_labels.zip',
 'hmda_2011_nationwide_all-records_labels.zip',
 'hmda_2012_nationwide_all-records_labels.zip',
 'hmda_2013_nationwide_all-records_labels.zip',
 'hmda_2014_nationwide_all-records_labels.zip',
 'hmda_2015_nationwide_all-records_labels.zip',
 'hmda_2016_nationwide_all-records_labels.zip',
 'hmda_2017_nationwide_all-records_labels.zip']

In [29]:
# Extraction complète des ZIP HMDA 2007–2017 puis conversion en Parquet (par année)
# - Cherche chaque ZIP "hmda_<year>_nationwide_all-records_labels.zip" dans DATA_RAW
# - Extrait le CSV interne dans un dossier temporaire
# - Détecte le séparateur (',' ou '|')
# - Lit en chunks et écrit un Parquet "<DATA_WORK>/hmda_<year>.parquet"

import os, re, zipfile, shutil
from pathlib import Path
import pandas as pd
import pyarrow as pa, pyarrow.parquet as pq

# Dossiers
DATA_RAW  = r"C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_raw\hmda"
DATA_WORK = r"C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_work"
Path(DATA_WORK).mkdir(parents=True, exist_ok=True)

# --- Helpers ---
def detect_sep_from_file(txt_path: str, sample_lines: int = 20000) -> str:
    pipe = comma = 0
    with open(txt_path, "r", encoding="utf-8", errors="replace") as f:
        for i, line in enumerate(f):
            if i >= sample_lines: break
            pipe  += line.count("|")
            comma += line.count(",")
    return "|" if pipe > comma else ","

def extract_inner_text(zip_path: str, tmp_dir: str) -> str:
    tmp_dir = Path(tmp_dir); tmp_dir.mkdir(parents=True, exist_ok=True)
    with zipfile.ZipFile(zip_path) as z:
        members = [n for n in z.namelist() if n.lower().endswith((".csv", ".txt", ".dat"))]
        if not members:
            raise RuntimeError(f"Aucun CSV/TXT/DAT dans {zip_path} (contenu: {z.namelist()[:5]})")
        member = members[0]
        out_path = tmp_dir / Path(member).name
        with z.open(member) as src, open(out_path, "wb") as dst:
            shutil.copyfileobj(src, dst)
    return str(out_path)

def convert_csv_to_parquet(csv_path: str, year: int, out_parquet: str, sep: str, chunksize: int = 500_000) -> int:
    # Supprime un ancien parquet pour éviter conflit de schéma
    if os.path.exists(out_parquet):
        os.remove(out_parquet)

    total = 0
    writer = None
    schema = None

    for chunk in pd.read_csv(csv_path, sep=sep, low_memory=False, chunksize=chunksize,
                             encoding="utf-8", on_bad_lines="skip"):
        if "year" not in chunk.columns:
            chunk["year"] = year
        table = pa.Table.from_pandas(chunk, preserve_index=False)
        if writer is None:
            schema = table.schema
            writer = pq.ParquetWriter(out_parquet, schema, use_dictionary=True)
        writer.write_table(table)
        total += len(chunk)

    if writer is not None:
        writer.close()
    return total

# --- Traitement principal ---
zips = [f for f in os.listdir(DATA_RAW) if f.endswith("_nationwide_all-records_labels.zip")]
zips = sorted(zips)

for name in zips:
    m = re.search(r"(\d{4})", name)
    if not m:
        print("skip (pas d'année détectée):", name)
        continue
    year = int(m.group(1))
    zip_path = os.path.join(DATA_RAW, name)
    tmp_dir  = os.path.join(Path(DATA_WORK).parent, "_tmp_extracts", f"hmda_{year}")

    try:
        print(f"\n=== {year} | Source: {name} ===")
        extracted = extract_inner_text(zip_path, tmp_dir)
        print(f"  [1/2] Extrait → {extracted}")
        sep = detect_sep_from_file(extracted)
        print(f"  [2/2] Séparateur détecté: {repr(sep)}")

        out_parquet = os.path.join(DATA_WORK, f"hmda_{year}.parquet")
        n = convert_csv_to_parquet(extracted, year, out_parquet, sep=sep)
        print(f"✅ {year}: {n:,} lignes → {out_parquet}")
    except zipfile.BadZipFile:
        print(f"❌ {year}: ZIP corrompu ou illisible → {zip_path}")
    except Exception as e:
        print(f"❌ {year}: {e}")
    finally:
        # Nettoyage du fichier extrait (garde le dossier pour debug s'il reste d'autres fichiers)
        try:
            if 'extracted' in locals() and os.path.exists(extracted):
                os.remove(extracted)
            # supprime le dossier s'il est vide
            p = Path(tmp_dir)
            if p.exists() and not any(p.iterdir()):
                p.rmdir()
        except Exception:
            pass



=== 2007 | Source: hmda_2007_nationwide_all-records_labels.zip ===
  [1/2] Extrait → C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\_tmp_extracts\hmda_2007\hmda_2007_nationwide_all-records_labels.csv
  [2/2] Séparateur détecté: ','
✅ 2007: 26,605,695 lignes → C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_work\hmda_2007.parquet

=== 2008 | Source: hmda_2008_nationwide_all-records_labels.zip ===
  [1/2] Extrait → C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\_tmp_extracts\hmda_2008\hmda_2008_nationwide_all-records_labels.csv
  [2/2] Séparateur détecté: ','
✅ 2008: 17,391,570 lignes → C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_work\hmda_2008.parquet

=== 2009 | Source: hmda_2009_nationwide_all-records_labels.zip ===
  [1/2] Extrait → C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\_tmp_extracts\hmda_2009\hmda_2009_nationwide_all-records_labels.csv
  [2/2] Séparateur détecté:

**Uniquement pour les années 2010, 2013, 2014, 2015; nous avons des échecs ❌.**<br>
**En fait nous avons des erreurs de schéma Parquet parce que certains champs numériques passent d’un chunk à l’autre de int64 à float64.<br> Comme solution simple et robuste nous allons harmoniser les types avant d’écrire**

In [31]:
# Re-traitement robuste (sans pandas.api.types.is_*): 2010, 2013, 2014, 2015
import os, re, zipfile, shutil
from pathlib import Path
import pandas as pd
import numpy as np
import pyarrow as pa, pyarrow.parquet as pq

DATA_RAW  = r"C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_raw\hmda"
DATA_WORK = r"C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_work"
Path(DATA_WORK).mkdir(parents=True, exist_ok=True)

def detect_sep_from_file(txt_path: str, sample_lines: int = 20000) -> str:
    pipe = comma = 0
    with open(txt_path, "r", encoding="utf-8", errors="replace") as f:
        for i, line in enumerate(f):
            if i >= sample_lines: break
            pipe  += line.count("|")
            comma += line.count(",")
    return "|" if pipe > comma else ","

def extract_inner_text(zip_path: str, tmp_dir: str) -> str:
    tmp_dir = Path(tmp_dir); tmp_dir.mkdir(parents=True, exist_ok=True)
    with zipfile.ZipFile(zip_path) as z:
        members = [n for n in z.namelist() if n.lower().endswith((".csv", ".txt", ".dat"))]
        if not members:
            raise RuntimeError(f"Aucun CSV/TXT/DAT dans {zip_path} (contenu: {z.namelist()[:5]})")
        member = members[0]
        out_path = tmp_dir / Path(member).name
        with z.open(member) as src, open(out_path, "wb") as dst:
            shutil.copyfileobj(src, dst)
    return str(out_path)

# Colonnes textuelles connues (si présentes)
TEXT_LIKE = {
    "respondent_id", "agency_name", "agency_abbr",
    "loan_type_name", "property_type_name", "loan_purpose_name",
    "owner_occupancy_name", "preapproval_name", "action_taken_name",
    "msamd_name", "state_name", "state_abbr", "county_name",
    "applicant_ethnicity_name", "co_applicant_ethnicity_name",
    "applicant_race_name_1","applicant_race_name_2","applicant_race_name_3","applicant_race_name_4","applicant_race_name_5",
    "co_applicant_race_name_1","co_applicant_race_name_2","co_applicant_race_name_3","co_applicant_race_name_4","co_applicant_race_name_5",
    "applicant_sex_name","co_applicant_sex_name",
    "purchaser_type_name","denial_reason_name_1","denial_reason_name_2","denial_reason_name_3",
    "hoepa_status_name","lien_status_name","edit_status_name"
}

def infer_target_types(df: pd.DataFrame, year: int) -> dict:
    """
    Détermine pour chaque colonne si on la traite en 'string' (pandas StringDtype) ou 'float'
    - colonnes connues TEXT_LIKE → string
    - colonnes numériques natives → float
    - colonnes object → test rapide: si >=10% des valeurs sont convertissables en nombre → float, sinon string
    """
    cols = [c.strip().lower() for c in df.columns]
    type_map = {}
    for c in cols:
        if c in TEXT_LIKE:
            type_map[c] = "string"
            continue
        s = df[c]
        # dtype-name simple pour compat
        dt = str(s.dtype)
        if dt.startswith(("int", "float", "bool")):
            type_map[c] = "float"
        elif dt == "object":
            # échantillon
            sample = s.head(5000)
            numeric = pd.to_numeric(sample, errors="coerce")
            ratio_num = numeric.notna().mean()
            type_map[c] = "float" if ratio_num >= 0.10 else "string"
        else:
            # fallback: string (catégories, etc.)
            type_map[c] = "string"
    # year forcé
    type_map.setdefault("year", "float")  # on laissera year en float64 pour éviter clash
    return type_map

def coerce_to_types(df: pd.DataFrame, year: int, type_map: dict) -> pd.DataFrame:
    df = df.copy()
    df.columns = [c.strip().lower() for c in df.columns]
    if "year" not in df.columns:
        df["year"] = year
    for c, t in type_map.items():
        if c not in df.columns:
            # colonne absente : on l'ajoute
            df[c] = pd.Series([np.nan]*len(df))
            if t == "string":
                df[c] = df[c].astype("string")
            else:
                df[c] = df[c].astype("float64")
            continue
        if t == "string":
            # astype string; convertit NaN proprement
            df[c] = df[c].astype("string")
        else:
            # float : convertir numériquement
            if str(df[c].dtype).startswith(("int", "float", "bool")):
                df[c] = df[c].astype("float64")
            else:
                df[c] = pd.to_numeric(df[c], errors="coerce").astype("float64")
    # réordonner selon type_map
    ordered_cols = list(type_map.keys())
    # garder aussi d'éventuelles colonnes supplémentaires en fin
    extras = [c for c in df.columns if c not in ordered_cols]
    return df[ordered_cols + extras]

def convert_csv_to_parquet_relaxed(csv_path: str, year: int, out_parquet: str, sep: str, chunksize: int = 400_000) -> int:
    if os.path.exists(out_parquet):
        os.remove(out_parquet)
    total = 0
    writer = None
    target_schema = None
    type_map = None

    chunk_iter = pd.read_csv(csv_path, sep=sep, low_memory=False, chunksize=chunksize,
                             encoding="utf-8", on_bad_lines="skip")
    for i, chunk in enumerate(chunk_iter):
        chunk.columns = [c.strip().lower() for c in chunk.columns]
        # première passe: inférer la carte des types cibles sur le premier chunk
        if type_map is None:
            type_map = infer_target_types(chunk, year)
        # coercition au schéma cible
        chunk = coerce_to_types(chunk, year, type_map)
        table = pa.Table.from_pandas(chunk, preserve_index=False)
        if writer is None:
            target_schema = table.schema
            writer = pq.ParquetWriter(out_parquet, target_schema, use_dictionary=True)
        else:
            # alignement si colonnes manquantes
            missing = [f.name for f in target_schema if f.name not in table.schema.names]
            for col in missing:
                field = target_schema.field(col)
                if pa.types.is_string(field.type):
                    arr = pa.array([None] * len(chunk), type=pa.string())
                else:
                    arr = pa.array([None] * len(chunk), type=field.type)
                table = table.append_column(col, arr)
            table = table.select(target_schema.names)
        writer.write_table(table)
        total += len(chunk)

    if writer is not None:
        writer.close()
    return total

FAILED_YEARS = [2010, 2013, 2014, 2015]
for year in FAILED_YEARS:
    zip_name = f"hmda_{year}_nationwide_all-records_labels.zip"
    zip_path = os.path.join(DATA_RAW, zip_name)
    tmp_dir  = os.path.join(Path(DATA_WORK).parent, "_tmp_extracts", f"hmda_{year}")
    try:
        print(f"\n=== {year} | Re-traitement (schéma compat) ===")
        extracted = extract_inner_text(zip_path, tmp_dir)
        print(f"  [1/2] Extrait → {extracted}")
        sep = detect_sep_from_file(extracted)
        print(f"  [2/2] Sep: {repr(sep)} → écriture Parquet harmonisée")
        out_parquet = os.path.join(DATA_WORK, f"hmda_{year}.parquet")
        n = convert_csv_to_parquet_relaxed(extracted, year, out_parquet, sep=sep)
        print(f"✅ {year}: {n:,} lignes → {out_parquet}")
    except Exception as e:
        print(f"❌ {year}: {e}")
    finally:
        try:
            if 'extracted' in locals() and os.path.exists(extracted):
                os.remove(extracted)
            p = Path(tmp_dir)
            if p.exists() and not any(p.iterdir()):
                p.rmdir()
        except Exception:
            pass



=== 2010 | Re-traitement (schéma compat) ===
  [1/2] Extrait → C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\_tmp_extracts\hmda_2010\hmda_2010_nationwide_all-records_labels.csv
  [2/2] Sep: ',' → écriture Parquet harmonisée
✅ 2010: 16,348,557 lignes → C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_work\hmda_2010.parquet

=== 2013 | Re-traitement (schéma compat) ===
  [1/2] Extrait → C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\_tmp_extracts\hmda_2013\hmda_2013_nationwide_all-records_labels.csv
  [2/2] Sep: ',' → écriture Parquet harmonisée
✅ 2013: 17,016,159 lignes → C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_work\hmda_2013.parquet

=== 2014 | Re-traitement (schéma compat) ===
  [1/2] Extrait → C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\_tmp_extracts\hmda_2014\hmda_2014_nationwide_all-records_labels.csv
  [2/2] Sep: ',' → écriture Parquet harmonisée
✅ 2014: 12,049,3

**Filtrer 2007–2017 (nationwide) → Tri-State (NY/NJ/CT)**<br>
*Nous allons Créer des hmda_tristate_<year>.parquet pour 2007–2017 à partir des Parquet nationwide.*

In [33]:
# Filtrage Tri-State 2007–2017 SANS pandas (streaming PyArrow)
# - lit chaque Parquet nationwide en mode dataset
# - applique un filtre sur state_abbr ou state_code
# - écrit hmda_tristate_<year>.parquet en streaming (batches)

import os
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import pyarrow.compute as pc
from pathlib import Path

DATA_WORK = r"C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_work"
Path(DATA_WORK).mkdir(parents=True, exist_ok=True)

TRI_STATES = ["NY","NJ","CT"]
TRI_FIPS = [36, 34, 9]  # NY, NJ, CT

def filter_and_write_streaming(src_parquet: str, dst_parquet: str, batch_size: int = 131072) -> int:
    """
    Lit un Parquet 'src_parquet' en streaming (pyarrow.dataset), filtre Tri-State,
    et écrit 'dst_parquet' en append. Retourne le nombre de lignes écrites.
    """
    dataset = ds.dataset(src_parquet, format="parquet")
    names = set(dataset.schema.names)

    # Filtre selon colonnes disponibles (pré-2018 : state_abbr ou state_code)
    if "state_abbr" in names:
        filt = pc.is_in(pc.field("state_abbr"), pa.array(TRI_STATES, type=pa.string()))
    elif "state_code" in names:
        # certains millésimes codent state_code en float -> cast en int64 avant comparaison
        filt = pc.is_in(pc.cast(pc.field("state_code"), pa.int64()), pa.array(TRI_FIPS, type=pa.int64()))
    else:
        raise RuntimeError(f"Aucune colonne d'État détectée dans: {src_parquet}")

    # Scanner en streaming avec filtre
    scanner = ds.Scanner.from_dataset(
        dataset,
        filter=filt,
        columns=None,            # ou liste pour projeter un sous-ensemble de colonnes
        batch_size=batch_size    # augmenter/diminuer selon RAM
    )

    total = 0
    writer = None
    try:
        for batch in scanner.to_batches():
            if batch.num_rows == 0:
                continue
            table = pa.Table.from_batches([batch])
            if writer is None:
                # Crée le writer au premier batch, en fixant le schéma
                writer = pq.ParquetWriter(dst_parquet, table.schema, use_dictionary=True)
            writer.write_table(table)
            total += table.num_rows
    finally:
        if writer is not None:
            writer.close()

    return total

# Traite 2007 → 2017
for year in range(2007, 2018):
    src = os.path.join(DATA_WORK, f"hmda_{year}.parquet")            # nationwide existant
    dst = os.path.join(DATA_WORK, f"hmda_tristate_{year}.parquet")   # sortie Tri-State
    if not os.path.exists(src):
        print(f"skip {year}: introuvable → {src}")
        continue
    if os.path.exists(dst):
        print(f"✔️ déjà présent: {dst}")
        continue

    print(f"→ {year}: filtre Tri-State (streaming)")
    rows = filter_and_write_streaming(src, dst, batch_size=131072)
    print(f"✅ {year}: {rows:,} lignes → {dst}")


✔️ déjà présent: C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_work\hmda_tristate_2007.parquet
✔️ déjà présent: C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_work\hmda_tristate_2008.parquet
✔️ déjà présent: C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_work\hmda_tristate_2009.parquet
→ 2010: filtre Tri-State (streaming)
✅ 2010: 1,234,294 lignes → C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_work\hmda_tristate_2010.parquet
→ 2011: filtre Tri-State (streaming)
✅ 2011: 1,132,414 lignes → C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_work\hmda_tristate_2011.parquet
→ 2012: filtre Tri-State (streaming)
✅ 2012: 1,322,973 lignes → C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_work\hmda_tristate_2012.parquet
→ 2013: filtre Tri-State (streaming)
✅ 2013: 1,186,639 lignes → C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\da

In [None]:
###############################################################################################################

**5 HMDA Tri-State (NY, NJ, CT) — 2018→2024 via FFIEC Data Browser API**<br>
        *le but ici est de profiter de l'existence de l'API qui est une garantie de l'accessibilité et la fiabilité de la data*<br>
        *Malheureusement elle à ne possède pas de données ultérieure à 2018 d'où l'approche par teléchargement manuel ci dessus.*

In [24]:
# HMDA Tri-State (NY, NJ, CT) — 2018→2024 via FFIEC Data Browser API
# Corrigé : accepte les réponses gzip (CSV compressé) et convertit en Parquet.

import os, time, gzip, io, requests, pandas as pd
import pyarrow as pa, pyarrow.parquet as pq
from pathlib import Path

# Répertoires
DATA_RAW  = r"C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_raw\hmda"
DATA_WORK = r"C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_work"
Path(DATA_RAW).mkdir(parents=True, exist_ok=True)
Path(DATA_WORK).mkdir(parents=True, exist_ok=True)

YEARS       = list(range(2018, 2025))
TRI_STATES  = ["NY","NJ","CT"]
ACTIONS_ALL = "1,2,3,4,5,6,7,8"   # filtre HMDA requis
CSV_ENDPOINT = "https://ffiec.cfpb.gov/v2/data-browser-api/view/csv"

def _is_gzip_header(b: bytes) -> bool:
    return len(b) >= 2 and b[0] == 0x1F and b[1] == 0x8B

def download_csv_tristate(year: int) -> str:
    """
    Télécharge le CSV Tri-State. Si l'API renvoie du gzip (Content-Encoding: gzip ou header 1F 8B),
    on sauvegarde tel quel en .csv.gz ; sinon on compresse à la volée.
    """
    params  = {"years": str(year), "states": ",".join(TRI_STATES), "actions_taken": ACTIONS_ALL}
    headers = {"User-Agent": "Mozilla/5.0", "Accept-Encoding": "gzip"}  # laisser le serveur gziper
    out_gz  = os.path.join(DATA_RAW, f"hmda_tristate_{year}.csv.gz")
    tmp_gz  = out_gz + ".part"
    for p in (tmp_gz,):
        if os.path.exists(p):
            try: os.remove(p)
            except: pass

    with requests.get(CSV_ENDPOINT, params=params, stream=True, timeout=1200, headers=headers) as r:
        r.raise_for_status()

        # On lit un premier chunk pour déterminer si la réponse est déjà gzippée
        it = r.iter_content(chunk_size=1<<20)
        first = b""
        for chunk in it:
            if chunk:
                first = chunk
                break
        if not first:
            raise RuntimeError(f"{year}: Réponse vide")

        # Si la réponse est déjà gzip (soit via Content-Encoding, soit via header magique), on écrit tel quel
        already_gzip = ("gzip" in r.headers.get("Content-Encoding","").lower()) or _is_gzip_header(first)

        if already_gzip:
            with open(tmp_gz, "wb") as f:
                f.write(first)
                for chunk in it:
                    if chunk: f.write(chunk)
            os.replace(tmp_gz, out_gz)
            return out_gz
        else:
            # Réponse texte CSV non compressée → on compresse à la volée en .csv.gz
            with gzip.open(tmp_gz, "wb") as f:
                f.write(first)
                for chunk in it:
                    if chunk: f.write(chunk)
            os.replace(tmp_gz, out_gz)
            return out_gz

def csv_gz_to_parquet(csv_gz_path: str, year: int, out_parquet: str, chunksize: int = 250_000) -> int:
    if os.path.exists(out_parquet):
        os.remove(out_parquet)
    total = 0
    for chunk in pd.read_csv(csv_gz_path, compression="gzip", low_memory=False, chunksize=chunksize):
        if "year" not in chunk.columns:
            chunk["year"] = year
        table = pa.Table.from_pandas(chunk, preserve_index=False)
        if not os.path.exists(out_parquet):
            pq.write_table(table, out_parquet)
        else:
            with pq.ParquetWriter(out_parquet, table.schema, use_dictionary=True) as w:
                w.write_table(table)
        total += len(chunk)
    return total

report = []
for y in YEARS:
    try:
        print(f"\n=== {y} — téléchargement Tri-State (API CSV/gzip) ===")
        csv_gz = download_csv_tristate(y)
        print(f"   CSV.gz → {csv_gz}")
        out_parquet = os.path.join(DATA_WORK, f"hmda_tristate_{y}.parquet")
        print(f"=== {y} — conversion Parquet ===")
        n = csv_gz_to_parquet(csv_gz, y, out_parquet)
        print(f"✅ {y}: {n:,} lignes → {out_parquet}")
        report.append((y, "ok", n))
    except Exception as e:
        print(f"❌ {y}: {e}")
        report.append((y, "fail", 0))
    time.sleep(0.5)

print("\n=== RÉSUMÉ ===")
for y, status, n in report:
    print(f"{y}: {status} ({n:,} rows)" if status=="ok" else f"{y}: {status}")




=== 2018 — téléchargement Tri-State (API CSV/gzip) ===
   CSV.gz → C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_raw\hmda\hmda_tristate_2018.csv.gz
=== 2018 — conversion Parquet ===
❌ 2018: Not a gzipped file (b'ac')

=== 2019 — téléchargement Tri-State (API CSV/gzip) ===
   CSV.gz → C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_raw\hmda\hmda_tristate_2019.csv.gz
=== 2019 — conversion Parquet ===
✅ 2019: 1,181,766 lignes → C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_work\hmda_tristate_2019.parquet

=== 2020 — téléchargement Tri-State (API CSV/gzip) ===
   CSV.gz → C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_raw\hmda\hmda_tristate_2020.csv.gz
=== 2020 — conversion Parquet ===
✅ 2020: 1,632,042 lignes → C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_work\hmda_tristate_2020.parquet

=== 2021 — téléchargement Tri-State (API CSV/gzip) ===
   CSV.gz → C:\Users

**Seule 2018 a fait un echec ❌. En fait, il a été sauvegardé comme .csv.gz alors que le flux n’était pas gzippé.**<br> 
**Voici un correctif minimal :**<br> 
    *on ré-télécharge 2018 en forçant Accept-Encoding: identity, on compresse nous-mêmes en gzip si besoin, puis on reconvertit.*

In [26]:
# Correctif 2018 — retente avec headers alternatifs, puis fallback par État (NY, NJ, CT) + fusion
import os, io, gzip, time, requests, pandas as pd
import pyarrow as pa, pyarrow.parquet as pq

DATA_RAW  = r"C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_raw\hmda"
DATA_WORK = r"C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_work"
os.makedirs(DATA_RAW, exist_ok=True)
os.makedirs(DATA_WORK, exist_ok=True)

CSV_ENDPOINT = "https://ffiec.cfpb.gov/v2/data-browser-api/view/csv"
YEAR = 2018
TRI_STATES = ["NY","NJ","CT"]
ACTIONS_ALL = "1,2,3,4,5,6,7,8"

def _is_gzip_header(b: bytes) -> bool:
    return len(b) >= 2 and b[0] == 0x1F and b[1] == 0x8B

def _write_response_to_gz(resp: requests.Response, out_gz_path: str) -> None:
    """Écrit la réponse API dans un .csv.gz, qu'elle soit déjà gzippée ou non."""
    tmp = out_gz_path + ".part"
    if os.path.exists(tmp):
        try: os.remove(tmp)
        except: pass

    it = resp.iter_content(chunk_size=1<<20)
    first = b""
    for chunk in it:
        if chunk:
            first = chunk
            break
    if not first:
        raise RuntimeError("Réponse vide")

    if ("gzip" in resp.headers.get("Content-Encoding","").lower()) or _is_gzip_header(first):
        # Déjà gzippé : écrire tel quel
        with open(tmp, "wb") as f:
            f.write(first)
            for chunk in it:
                if chunk: f.write(chunk)
    else:
        # Pas gzip : compresser à la volée
        with gzip.open(tmp, "wb") as f:
            f.write(first)
            for chunk in it:
                if chunk: f.write(chunk)

    os.replace(tmp, out_gz_path)

def _attempt_download(params: dict, header_profile: int, timeout=1200) -> requests.Response | None:
    """Tente un GET avec un profil d’en-têtes donné. Retourne la Response ou None si status != 200."""
    headers_profiles = [
        # 0) Profil "normal" (gzip autorisé)
        {"User-Agent":"Mozilla/5.0", "Accept":"text/csv,*/*;q=0.8", "Accept-Encoding":"gzip", "Referer":"https://ffiec.cfpb.gov/data-browser/", "Origin":"https://ffiec.cfpb.gov"},
        # 1) Forcer pas de compression côté serveur
        {"User-Agent":"Mozilla/5.0", "Accept":"text/csv,*/*;q=0.8", "Accept-Encoding":"identity", "Referer":"https://ffiec.cfpb.gov/data-browser/", "Origin":"https://ffiec.cfpb.gov"},
        # 2) UA différent
        {"User-Agent":"curl/8.0", "Accept":"text/csv,*/*;q=0.8", "Accept-Encoding":"gzip", "Referer":"https://ffiec.cfpb.gov/data-browser/", "Origin":"https://ffiec.cfpb.gov"},
    ]
    headers = headers_profiles[header_profile % len(headers_profiles)]
    r = requests.get(CSV_ENDPOINT, params=params, stream=True, timeout=timeout, headers=headers, allow_redirects=True)
    if r.status_code == 200:
        ctype = r.headers.get("Content-Type","").lower()
        # L'API livre parfois gzip + text/csv; parfois application/octet-stream : on accepte si 200
        if "text/csv" in ctype or "octet-stream" in ctype or "csv" in ctype:
            return r
        # Si 200 mais content-type inattendu, on tente quand même (on inspectera l’en-tête gzip)
        return r
    return None

def download_2018_tristate_robust(out_gz_path: str) -> None:
    # 1) Essai direct Tri-State (toutes les têtes, avec backoff)
    params = {"years": str(YEAR), "states": ",".join(TRI_STATES), "actions_taken": ACTIONS_ALL}
    for attempt in range(6):
        resp = _attempt_download(params, header_profile=attempt)
        if resp is not None:
            _write_response_to_gz(resp, out_gz_path)
            return
        # backoff progressif
        time.sleep(1 + attempt)

    # 2) Fallback: télécharger par État et fusionner
    tmp_parts = []
    try:
        for st in TRI_STATES:
            p = {"years": str(YEAR), "states": st, "actions_taken": ACTIONS_ALL}
            part_gz = os.path.join(DATA_RAW, f"hmda_{YEAR}_{st}.csv.gz")
            # plusieurs tentatives d’en-têtes pour cet État
            ok = False
            for attempt in range(6):
                resp = _attempt_download(p, header_profile=attempt)
                if resp is not None:
                    _write_response_to_gz(resp, part_gz)
                    ok = True
                    break
                time.sleep(1 + attempt)
            if not ok:
                raise RuntimeError(f"Impossible de télécharger {YEAR} pour l'État {st}")
            tmp_parts.append(part_gz)

        # 3) Fusion des 3 fichiers .csv.gz en un seul .csv.gz
        #    On décompresse chaque part puis on recompresse dans out_gz_path, en gardant l'entête une seule fois.
        tmp = out_gz_path + ".part"
        if os.path.exists(tmp):
            try: os.remove(tmp)
            except: pass

        header_written = False
        with gzip.open(tmp, "wb") as fout:
            for i, part in enumerate(tmp_parts):
                with gzip.open(part, "rb") as fin:
                    for j, line in enumerate(fin):
                        if not header_written:
                            fout.write(line)
                            header_written = True
                        else:
                            # sauter l'entête des fichiers suivants
                            if j == 0:
                                continue
                            fout.write(line)
        os.replace(tmp, out_gz_path)
    finally:
        # Nettoyage des parts
        for pth in tmp_parts:
            try: os.remove(pth)
            except: pass

def csv_gz_to_parquet(csv_gz_path: str, year: int, out_parquet: str, chunksize: int = 250_000) -> int:
    if os.path.exists(out_parquet):
        os.remove(out_parquet)
    total = 0
    for chunk in pd.read_csv(csv_gz_path, compression="gzip", low_memory=False, chunksize=chunksize):
        if "year" not in chunk.columns:
            chunk["year"] = year
        table = pa.Table.from_pandas(chunk, preserve_index=False)
        if not os.path.exists(out_parquet):
            pq.write_table(table, out_parquet)
        else:
            with pq.ParquetWriter(out_parquet, table.schema, use_dictionary=True) as w:
                w.write_table(table)
        total += len(chunk)
    return total

# --- Lance la correction 2018 ---
out_csv_gz = os.path.join(DATA_RAW, f"hmda_tristate_{YEAR}.csv.gz")
print("↻ Téléchargement robuste 2018…")
download_2018_tristate_robust(out_csv_gz)

out_parquet = os.path.join(DATA_WORK, f"hmda_tristate_{YEAR}.parquet")
print("→ Conversion Parquet 2018…")
n = csv_gz_to_parquet(out_csv_gz, YEAR, out_parquet)
print(f"✅ 2018: {n:,} lignes → {out_parquet}")


↻ Téléchargement robuste 2018…
→ Conversion Parquet 2018…
✅ 2018: 1,071,054 lignes → C:\Users\33669\OneDrive\Документы\data_credit_scoring\tri_state_ai\data_work\hmda_tristate_2018.parquet


"""""""""""""""""""""""""""""""""""""""*MISSION 1 COMPLETE*""""""""""""""""""""""""""""""""""""""