In [1]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from pathlib import Path
import pandas as pd
import re
import unicodedata
import warnings

# ---- Désactive l'avertissement bruyant de pandas sur l'inférence de format de date ----
warnings.filterwarnings("ignore", message="Could not infer format.*", category=UserWarning)

# ================== PARAMS ================== #
ROOT_DIR = Path("fic")                # Dossier d'entrée (récursif)
EXPORT_DIR = Path("out")              # Dossier de sortie CSV (créé si export activé)
EXPORT_TABLES = True                  # Mettre True pour exporter les tables détectées
SHOW_SAMPLES = True                   # Afficher un aperçu (head) des tables détectées

# Heuristiques génériques (adapter si besoin)
MIN_COLS = 2
MIN_CONSEC_ROWS = 5
ROW_EMPTY_TOL = 1
STOP_EMPTY_RUN = 5
HEADER_SCAN_DEPTH = 6

# Filtrage de colonnes après extraction (pour supprimer col/col_2… vides)
MIN_NON_NULL_RATIO = 0.05
MIN_NON_NULL_ABS   = 2

# Inférence de types
INFER_TYPES = True
DATE_NAME_HINTS = ("date", "dt_", "_dt", "attribution", "retrait")
DATE_TOKEN_RE = re.compile(
    r"[/\-.]|(?:jan|feb|mar|apr|mai|may|jun|jul|aug|sep|oct|nov|dec|"
    r"janv|févr|fevr|avr|juil|sept|oct|nov|déc|dec)",
    re.I
)
BOOL_TRUE  = {"true","vrai","oui","y","1"}
BOOL_FALSE = {"false","faux","non","n","0"}

PRE_PARSE_TOKEN_RATIO_IF_NAME   = 0.10
PRE_PARSE_TOKEN_RATIO_NO_NAME   = 0.30
# ============================================ #

def strip_accents_lower(s: str) -> str:
    if s is None or pd.isna(s):
        return ""
    s = str(s)
    s = unicodedata.normalize("NFKD", s)
    s = "".join(c for c in s if not unicodedata.combining(c))
    return s.lower().strip()

def safe_read_excel_all_sheets(path: Path):
    xls = pd.ExcelFile(path, engine="openpyxl")
    out = []
    for sheet in xls.sheet_names:
        df = pd.read_excel(xls, sheet_name=sheet, header=None)
        out.append((sheet, df))
    return out

def pick_encoding(path: Path):
    try:
        from charset_normalizer import from_path
        res = from_path(str(path)).best()
        return res.encoding if res else None
    except Exception:
        return None

def read_csv_robust(path: Path):
    enc_guess = pick_encoding(path)
    candidates = [e for e in [enc_guess, "utf-8-sig", "utf-8", "cp1252", "latin-1", "iso-8859-1"] if e]
    for enc in candidates:
        try:
            return pd.read_csv(path, sep=None, engine="python", encoding=enc, header=None)
        except Exception:
            pass
    return pd.read_csv(path, sep=None, engine="python", encoding="latin-1", encoding_errors="replace", header=None)

def is_tabular_row(row, min_cols=MIN_COLS):
    return row.notna().sum() >= min_cols

def choose_header_row(block: pd.DataFrame):
    best_idx = None
    best_score = -1
    limit = min(len(block), HEADER_SCAN_DEPTH)
    for i in range(limit):
        row = block.iloc[i]
        vals = row.tolist()
        non_empty = sum(pd.notna(v) and str(v).strip() != "" for v in vals)
        texty = 0
        for v in vals:
            s = str(v).strip() if pd.notna(v) else ""
            if s == "" or s.lower().startswith("unnamed"):
                continue
            if re.search(r"[A-Za-zÀ-ÿ]", s):
                texty += 1
        score = non_empty * 2 + texty
        if score > best_score:
            best_score = score
            best_idx = i
    return best_idx if best_idx is not None else 0

def clean_columns(vals):
    cols = []
    seen = {}
    for v in vals:
        s = strip_accents_lower(v)
        s = s.replace("\n", " ")
        s = re.sub(r"\s+", " ", s).strip(" -_")
        if s == "" or s.startswith("unnamed"):
            s = "col"
        s = re.sub(r"[^a-z0-9_ ]", "", s)
        s = re.sub(r"\s+", "_", s).strip("_")
        if s == "":
            s = "col"
        if s in seen:
            seen[s] += 1
            s = f"{s}_{seen[s]}"
        else:
            seen[s] = 1
        cols.append(s)
    return cols

# --- NEW: util pour savoir si un nom de colonne est "générique" (à autoriser au prune) ---
GENERIC_COL_RE = re.compile(r"^col(_\d+)?$", re.I)
def is_generic_colname(name: str) -> bool:
    return bool(GENERIC_COL_RE.fullmatch(name or ""))

def detect_blocks(df: pd.DataFrame):
    if list(df.columns) != list(range(df.shape[1])):
        df = df.copy()
        df.columns = list(range(df.shape[1]))

    blocks = []
    consec = 0
    empties_inside = 0
    start = None

    for i in df.index:
        row = df.loc[i]
        if is_tabular_row(row):
            if start is None:
                start = i
                consec = 0
                empties_inside = 0
            consec += 1
        else:
            if start is not None:
                empties_inside += 1
                if empties_inside > ROW_EMPTY_TOL:
                    end = i - (empties_inside)
                    if end >= start and consec >= MIN_CONSEC_ROWS:
                        blocks.append((start, end))
                    start = None
                    consec = 0
                    empties_inside = 0

    if start is not None and consec >= MIN_CONSEC_ROWS:
        blocks.append((start, df.index[-1]))

    return blocks

# ---------- CHANGED: prune_columns conserve toutes les colonnes à nom SIGNIFICATIF ----------
def prune_columns(df: pd.DataFrame, header_keep: set[str]) -> pd.DataFrame:
    """
    Garde:
      - toutes les colonnes dont le nom d'en-tête est significatif (≠ 'col', 'col_2', ...)
      - et, parmi les noms génériques, celles qui ont assez de données.
    """
    if df.empty:
        return df

    # On ne supprime jamais les colonnes d'en-tête "significatives"
    keep = list(header_keep)

    # Évalue les colonnes génériques restantes
    n = len(df)
    for c in df.columns:
        if c in header_keep:
            continue
        if not is_generic_colname(c):
            # par sécurité : conserver aussi toute colonne non générique
            keep.append(c)
            continue
        nnz = df[c].notna().sum()
        if nnz >= max(MIN_NON_NULL_ABS, int(n * MIN_NON_NULL_RATIO)):
            keep.append(c)

    # Conserver l'ordre original
    keep_ordered = [c for c in df.columns if c in keep]
    return df[keep_ordered].copy()

# ---------- Inférence de types & affichage schéma ---------- #

def infer_and_cast_column(s: pd.Series, col_name: str) -> tuple[pd.Series, str]:
    """
    Types possibles: date, int, float, bool, string.
    """
    name_norm = strip_accents_lower(col_name)
    looks_like_date_name = any(h in name_norm for h in DATE_NAME_HINTS)

    s_obj = s.astype("string")
    non_empty = s_obj.dropna()

    numeric_only_ratio = 0.0
    if len(non_empty) > 0:
        numeric_only_ratio = sum(bool(re.fullmatch(r"\d+(?:[.,]\d+)?", str(x).strip()))
                                 for x in non_empty) / len(non_empty)

    date_token_ratio = 0.0
    if len(non_empty) > 0:
        date_token_ratio = sum(bool(DATE_TOKEN_RE.search(str(x)))
                               for x in non_empty) / len(non_empty)

    should_try_parse = (
        (looks_like_date_name and date_token_ratio >= PRE_PARSE_TOKEN_RATIO_IF_NAME) or
        ((not looks_like_date_name) and date_token_ratio >= PRE_PARSE_TOKEN_RATIO_NO_NAME and numeric_only_ratio < 0.80)
    )

    if should_try_parse:
        parsed_dates = pd.to_datetime(s, errors="coerce", dayfirst=True)
        date_ratio = parsed_dates.notna().sum() / max(1, s.notna().sum())
        accept_date = (date_ratio >= 0.30) if looks_like_date_name else (date_ratio >= 0.70)
        if accept_date:
            return parsed_dates.dt.normalize(), "date"

    as_num = pd.to_numeric(s, errors="coerce")
    num_ratio = as_num.notna().sum() / max(1, s.notna().sum())
    if num_ratio >= 0.85:
        as_int = as_num.dropna()
        if len(as_int) == 0:
            return as_num.astype("Float64"), "float"
        if (as_int % 1 == 0).all():
            return as_num.astype("Int64"), "int"
        else:
            return as_num.astype("Float64"), "float"

    vals = non_empty.map(strip_accents_lower).unique().tolist()
    small_set = set(vals)
    if 1 <= len(small_set) <= 3:
        mapped = s.astype(str).map(strip_accents_lower)
        def map_bool(x):
            if x in BOOL_TRUE: return True
            if x in BOOL_FALSE: return False
            return pd.NA
        mb = mapped.map(map_bool)
        if mb.notna().sum() / max(1, mapped.notna().sum()) >= 0.9:
            return mb.astype("boolean"), "bool"

    return s.astype("string"), "string"

def infer_types_df(df: pd.DataFrame) -> tuple[pd.DataFrame, dict]:
    schema = {}
    out = df.copy()
    for c in out.columns:
        out[c], t = infer_and_cast_column(out[c], c)
        schema[c] = t
    return out, schema

def show_schema(df: pd.DataFrame, schema: dict):
    print("\n=== schema ===")
    for c in df.columns:
        t = schema.get(c, str(df[c].dtype)).upper()
        print(f"{c}: {t}")

# ----------------------------------------------------------- #

def carve_table_from_block(df_block: pd.DataFrame):
    if df_block.empty:
        return None

    # header
    h_rel = choose_header_row(df_block)
    raw_header_vals = df_block.iloc[h_rel].tolist()
    cols = clean_columns(raw_header_vals)

    data = df_block.iloc[h_rel+1:].copy()
    data.columns = cols

    # stop à N lignes vides consécutives après le header
    empty_run = 0
    cut_idx = data.index[-1]
    for idx in data.index:
        if data.loc[idx].isna().all():
            empty_run += 1
            if empty_run >= STOP_EMPTY_RUN:
                cut_idx = idx - STOP_EMPTY_RUN
                break
        else:
            empty_run = 0

    data = data.loc[:cut_idx]
    data = data[data.notna().sum(axis=1) >= MIN_COLS]

    # --- NEW: déterminer quelles colonnes d'en-tête sont "significatives"
    header_keep = set()
    for c in cols:
        if not is_generic_colname(c):
            header_keep.add(c)

    # prune en gardant toujours les colonnes nommées (ex: 'mois', 'tarif_facture', etc.)
    data = prune_columns(data, header_keep)
    if data.empty:
        return None

    # inférence & cast de types
    if INFER_TYPES:
        data, schema = infer_types_df(data)
    else:
        schema = {c: str(data[c].dtype) for c in data.columns}

    data = data.reset_index(drop=True)

    # dates en YYYY-MM-DD (affichage + CSV)
    for c, t in schema.items():
        if t == "date":
            try:
                data[c] = pd.to_datetime(data[c], errors="coerce").dt.strftime("%Y-%m-%d")
            except Exception:
                pass

    return data, schema

def find_tables_in_sheet(df_raw: pd.DataFrame):
    if list(df_raw.columns) != list(range(df_raw.shape[1])):
        df_raw = df_raw.copy()
        df_raw.columns = list(range(df_raw.shape[1]))

    blocks = detect_blocks(df_raw)

    tables = []
    for (start, end) in blocks:
        block = df_raw.loc[start:end, :]
        carved = carve_table_from_block(block)
        if carved is None:
            continue
        table, schema = carved
        if table is not None and table.shape[1] >= MIN_COLS and table.shape[0] >= 1:
            tables.append((table, schema))
    return tables

def process_file(path: Path):
    results = []
    if path.suffix.lower() in (".xlsx", ".xlsm"):
        sheets = safe_read_excel_all_sheets(path)
        for sheet, df_raw in sheets:
            tables = find_tables_in_sheet(df_raw)
            for i, (t, sc) in enumerate(tables, start=1):
                results.append((sheet, i, t, sc))
    elif path.suffix.lower() == ".csv":
        df_raw = read_csv_robust(path)
        tables = find_tables_in_sheet(df_raw)
        for i, (t, sc) in enumerate(tables, start=1):
            results.append((None, i, t, sc))
    else:
        raise ValueError(f"Extension non gérée: {path.suffix}")
    return results

def main():
    if not ROOT_DIR.exists():
        print(f"[error] Dossier introuvable : {ROOT_DIR.resolve()}")
        return

    files = []
    for pat in ("*.xlsx", "*.xlsm", "*.csv"):
        files += [p for p in ROOT_DIR.rglob(pat) if p.is_file() and not p.name.startswith("~$")]

    print("=== data header: folder info ===")
    print(f"path: {ROOT_DIR.resolve()}")
    print(f"files_found: {len(files)}")

    if not files:
        print("[warn] Aucun fichier .xlsx/.xlsm/.csv trouvé.")
        return

    if EXPORT_TABLES:
        EXPORT_DIR.mkdir(parents=True, exist_ok=True)

    for f in sorted(files):
        print("\n=== file ===")
        print(f"{f.name}  ({f.resolve()})")

        try:
            tables = process_file(f)
        except Exception as e:
            print(f"[error] Lecture échouée pour {f.name}: {e}")
            continue

        if not tables:
            print("[info] Aucune table détectée")
            continue

        for sheet, idx, df, schema in tables:
            title = f"{f.stem}__{sheet or 'sheet'}__table_{idx}"
            print(f"\n--- table detected ---")
            print(f"name: {title}")
            print(f"rows: {len(df)} | cols: {df.shape[1]}")
            print("columns:", ", ".join(map(str, df.columns.tolist())))

            show_schema(df, schema)

            if SHOW_SAMPLES:
                with pd.option_context("display.max_columns", 80, "display.width", 200):
                    print("\n=== sample (top 8) ===")
                    print(df.head(8))

            if EXPORT_TABLES:
                out_path = EXPORT_DIR / f"{title}.csv"
                df.to_csv(out_path, index=False)
                print(f"[saved] {out_path}")

    print("\n=== done ===")

if __name__ == "__main__":
    main()


=== data header: folder info ===
path: C:\globasoft\aerotech\fic
files_found: 0
[warn] Aucun fichier .xlsx/.xlsm/.csv trouvé.
