In [1]:
from datetime import datetime
from datetime import date
from shapely.ops import unary_union
from shapely.geometry import box
import hashlib, calendar
import pandas as pd
import geopandas as gpd
import json
import unicodedata
import re
import os
import zipfile
from typing import Dict, List, Any, Tuple, Set, Optional, Iterable, Union
from pathlib import Path
from dateutil.relativedelta import relativedelta

In [2]:
CATALOG = r"C:\Users\ADMrechbay20\PycharmProjects\metadata capture\catalog.json"
METADATA = r"C:\Users\ADMrechbay20\PycharmProjects\metadata capture\metadata"
RAW_DATA_ZONE = r"C:\Users\ADMrechbay20\PycharmProjects\metadata capture\data"

In [3]:
# 读取 geojson 文件
geojson_files = {
    "region": "../ref/geo_levels/region.geojson",
    # "academie": "../ref/geo_levels/academie.geojson",
    "departement": "../ref/geo_levels/departement.geojson",
    "arrondissement_departemental": "../ref/geo_levels/arrondissement_departemental.geojson",
    "epci": "../ref/geo_levels/epci.geojson",
    #"canton": "../ref/geo_levels/canton.geojson",
    "commune": "../ref/geo_levels/commune.geojson",
    "arrondissement_communal": "../ref/geo_levels/arrondissement_communal.geojson",
    "iris": "../ref/geo_levels/iris.geojson",
}

# read all GeoJSON
GEO_DATA = {level: gpd.read_file(path) for level, path in geojson_files.items()}

In [4]:
ColT = Union[str, int]
CACHE_DIR = os.path.join(METADATA, "_df_cache")

def _sample_bytes(path: str, size: int = 256_000) -> bytes:
    with open(path, "rb") as f:
        return f.read(size)

def _guess_encoding(sample: bytes) -> str:
    if sample.startswith(b"\xef\xbb\xbf"):
        return "utf-8-sig"
    for enc in ("utf-8", "cp1252", "latin1"):
        try:
            sample.decode(enc)
            return enc
        except UnicodeDecodeError:
            continue
    return "latin1"

def _guess_sep(sample_text: str):
    cands = [",", ";", "\t", "|"]
    counts = {sep: sample_text.count(sep) for sep in cands}
    best = max(counts, key=counts.get)
    return best if counts[best] > 0 else None

def _cache_path(src_path: str | os.PathLike, ext: str = ".parquet") -> str:
    """
    Return a stable cache file path OUTSIDE the data_dir so it won't be scanned as a dataset.
    Uses (abs path + mtime) hash to invalidate when source changes.
    """
    src_path = Path(src_path).resolve()
    os.makedirs(CACHE_DIR, exist_ok=True)
    mtime = src_path.stat().st_mtime if src_path.exists() else 0
    key = f"{src_path.as_posix()}|{mtime}"
    h = hashlib.sha1(key.encode("utf-8")).hexdigest()[:16]
    return os.path.join(CACHE_DIR, f"{h}{ext}")

def _is_cache_fresh(src: Path, cache: Path) -> bool:
    try:
        return cache.exists() and cache.stat().st_mtime >= src.stat().st_mtime
    except Exception:
        return False


# Normalisation helpers
def strip_accents(s: str):
    """Remove accents; normalise spaces/hyphens/apostrophes; fix common encoding issues."""
    if not isinstance(s, str):
        return s

    # --- Step 1: fix common mis-encodings (©, ¨e, etc.) ---
    replacements = {
        "Ã©": "é",   # common CSV bug: ma©tropole → métropole
        "Æ": "AE",
        "æ": "ae",
        "œ": "oe",  # œufs → oeufs
    }
    for bad, good in replacements.items():
        s = s.replace(bad, good)

    # --- Step 2: remove accents properly ---
    s = unicodedata.normalize("NFKD", s)
    s = "".join(ch for ch in s if not unicodedata.combining(ch))

    # --- Step 3: unify apostrophes and separators ---
    s = s.replace("’", "'").replace("`", "'").replace("´", "'")
    s = re.sub(r"[-_\s]+", " ", s)

    return s.strip()


def norm_name(v):
    """Normalise a display name for robust comparison."""
    if pd.isna(v):
        return ""
    return strip_accents(str(v)).lower()

def norm_code(v):
    """Normalise a code for robust comparison (uppercase, trimmed)."""
    if pd.isna(v):
        return ""
    return str(v).strip().upper()

def is_numeric_series(s):
    return pd.api.types.is_numeric_dtype(s)

def is_string_series(s):
    return pd.api.types.is_string_dtype(s) or s.dtype == object

def not_null_ratio(s):
    n = len(s)
    return float(s.notna().sum())/n if n else 0.0

def normalise_colname(name: str) -> str:
    """Lightweight colname normaliser for pattern matching."""
    # convert to string and lowercase
    s = str(name).lower()

    # remove accents using unicodedata
    s = unicodedata.normalize("NFKD", s)
    s = "".join(ch for ch in s if not unicodedata.combining(ch))

    # replace whitespace and hyphens with underscore
    s = re.sub(r"[\s\-]+", "_", s)

    return s

# ---------- helpers for zip + /vsizip/ ----------

def _first_member_with_suffix(zf: zipfile.ZipFile, *suffixes: str) -> str | None:
    """Return the first entry inside the zip matching any of the given suffixes (case-insensitive)."""
    suf = tuple(s.lower() for s in suffixes)
    for n in zf.namelist():
        ln = n.lower()
        if ln.startswith("__macosx/"):
            continue
        if ln.endswith(suf):
            return n
    return None

def _vsizip_path(zip_path: Path, inner_path: str) -> str:
    """Build a GDAL /vsizip/ path with forward slashes (works on Windows too)."""
    return f"/vsizip/{zip_path.as_posix()}/{inner_path}"

def _maybe_drop_geometry(gdf: gpd.GeoDataFrame, keep_geometry: bool = True):
    """Optionally drop active geometry to return a plain DataFrame."""
    if keep_geometry:
        return gdf
    return gdf.drop(columns=[gdf.geometry.name])


# ---------- your existing readers (unchanged logic) ----------

# -------- engine chooser --------
def _engine_for_excel(path: str) -> tuple[str|None, str|None]:
    ext = Path(path).suffix.lower()
    if ext in {".xlsx", ".xlsm"}:
        return "openpyxl", "pip install openpyxl"
    if ext == ".xls":
        return "xlrd", "pip install xlrd"
    if ext == ".xlsb":
        return "pyxlsb", "pip install pyxlsb"
    if ext == ".ods":
        return "odf", "pip install odfpy"
    return None, None

# -------- helpers --------
def _peek_block(path: str, sheet, engine: str|None, nrows: int = 1000) -> pd.DataFrame:
    return pd.read_excel(path, sheet_name=sheet, header=None, nrows=nrows, engine=engine)

def _detect_header_row(block: pd.DataFrame) -> int|None:
    # pick the row with max non-nulls as header line
    rc = block.count(axis=1)
    if rc.empty or rc.max() == 0:
        return None
    return int(rc.idxmax())

def _norm(s: str) -> str:
    s = str(s) if s is not None else ""
    s = s.strip().lower()
    s = re.sub(r"\s+", " ", s)
    return s

def _is_description_sheet(sheet_name: str, header_row_values: list) -> bool:
    # 1) name-based signal
    name_pat = re.compile(
        r"(description\s+(des\s+)?(données|variables)|"
        r"data\s+dictionary|dictionary|liste\s+des\s+variables|variables?)",
        re.IGNORECASE
    )
    if name_pat.search(str(sheet_name) or ""):
        return True

    # 2) header-based signal
    headers = [_norm(x) for x in header_row_values]
    # typical dictionary headers
    key_tokens = {"description", "détails", "définition", "definition", "explication"}
    var_tokens = {"variable", "champ", "field", "colonne", "column", "attribut"}
    if any(h in key_tokens for h in headers) and any(h in var_tokens for h in headers):
        return True
    # 2-column pattern: ["variable", "description"] or similar
    if len(headers) in (2, 3) and any(h in var_tokens for h in headers) and any(h in key_tokens for h in headers):
        return True

    return False

# -------- main --------
def read_excel_workbook_with_desc_detection(file_path: str):
    """
    Read all sheets; for each, detect header row, read a proper DataFrame,
    and flag whether the sheet looks like a 'description' (data dictionary).
    Returns: list of dicts with keys:
      - sheet_index, sheet_name, df, header_row, is_description
    """
    engine, hint = _engine_for_excel(file_path)
    try:
        with pd.ExcelFile(file_path, engine=engine) as xls:
            results = []
            for i, sheet_name in enumerate(xls.sheet_names):
                # 1) peek to find header row
                block = _peek_block(file_path, sheet_name, engine=engine, nrows=1000).dropna(how="all")
                if block.empty:
                    continue
                hdr = _detect_header_row(block)
                # guard: if none, skip this sheet
                if hdr is None:
                    continue

                # 2) final read with header
                df = pd.read_excel(file_path, sheet_name=sheet_name, header=hdr, engine=engine)

                # 3) description detection using sheet name and header row
                header_values = block.iloc[hdr].tolist()
                is_desc = _is_description_sheet(sheet_name, header_values)

                results.append({
                    "sheet_index": i,
                    "sheet_name": sheet_name,
                    "df": df,
                    "header_row": hdr,
                    "is_description": is_desc,
                })
            return results
    except ImportError as e:
        raise ImportError(f"Missing Excel engine for {Path(file_path).suffix.lower()}. Try: {hint}") from e

def count_str(df, num_row):
    row = df.iloc[num_row]  # 取第6行（Series）
    str_count = row.apply(lambda x: isinstance(x, str)).sum()
    return str_count

def _is_empty(x):
    if pd.isna(x):
        return True
    if isinstance(x, str):
        return x.strip() == ""
    return False

def test_title_with_null(df, num_row, max_num):
    nb_null = max_num - count_str(df, num_row)
    row_first = df.iloc[num_row,:nb_null]
    empty_mask = row_first.apply(_is_empty)
    return empty_mask.all()

def excel_EL(file_path, sheet_name=0, usecols=None, cache_parquet: bool = True):
    """
    Excel loader with automatic header-row detection.
    - Always previews the *first sheet* to detect a header row when no explicit preview target is provided.
    - Supports sheet selection by index or name.
    - If sheet_name is None / "first", it loads the first sheet.
    - Caches a Parquet copy (single-sheet mode) when enabled and cache is fresh.

    Parameters
    ----------
    file_path : str or Path
        Path to the Excel workbook.
    sheet_name : int | str | None, default 0
        Sheet index or name. If None or "first", the first sheet is used.
    usecols : list[str] | str | None
        Columns to load (passed through to pandas).
    cache_parquet : bool, default True
        Whether to read/write a Parquet cache alongside the source.

    Returns
    -------
    pandas.DataFrame
        Loaded sheet as a DataFrame with a detected header row.
    """
    src = Path(file_path)
    pq = _cache_path(file_path, ".parquet")

    # Use cache only for single-sheet read
    if cache_parquet and _is_cache_fresh(src, pq):
        try:
            return pd.read_parquet(pq, columns=usecols)
        except Exception:
            pass  # fall back to reading Excel

    engine, hint = _engine_for_excel(file_path)
    try:
        with pd.ExcelFile(file_path, engine=engine) as xls:
            # Resolve the target sheet to load
            if sheet_name in (None, "first"):
                target_sheet = xls.sheet_names[0] if xls.sheet_names else 0
            elif isinstance(sheet_name, int):
                # Clamp index to valid range
                if not xls.sheet_names:
                    raise ValueError("Workbook has no sheets.")
                idx = max(0, min(sheet_name, len(xls.sheet_names) - 1))
                target_sheet = xls.sheet_names[idx]
            else:
                # Assume it's a sheet name (string)
                target_sheet = sheet_name

            # --- Preview FIRST sheet to detect header row ---
            # We always use the first sheet for header-row detection to avoid dict return.
            first_sheet = xls.sheet_names[0] if xls.sheet_names else target_sheet
            block = pd.read_excel(file_path, sheet_name=first_sheet,
                                  header=None, nrows=1000, engine=engine)
    except ImportError as e:
        raise ImportError(f"Missing Excel engine for {src.suffix.lower()}. Try: {hint}") from e

    # Detect header row from the preview block
    block = block.dropna(how="all")
    if block.empty:
        header_row = 0
    else:
        row_counts = block.count(axis=1)
        header_row = int(row_counts.idxmax()) if not row_counts.empty else 0

    # Final read for the *target* sheet, using the detected header row
    df = pd.read_excel(
        file_path,
        sheet_name=target_sheet,
        header=header_row,
        engine=engine,
        usecols=usecols
    )

    # Cache to Parquet (best-effort)
    if cache_parquet:
        try:
            df.to_parquet(pq, index=False)
        except Exception:
            pass

    return df

def csv_EL(file_path):
    encs = ["utf-8", "utf-8-sig", "cp1252", "latin1"]
    seps = [None, ",", ";", "\t", "|"]
    for enc in encs:
        for sep in seps:
            try:
                df = pd.read_csv(file_path, encoding=enc, sep=sep, low_memory=False, on_bad_lines="skip")
                if df.shape[1] > 1: return df
            except Exception:
                pass
    raise ValueError(f"Could not parse {file_path} as CSV/TSV.")

def geojson_EL(file_path):
    """Read GeoJSON data"""
    gdf = gpd.read_file(file_path)
    return gdf

def json_EL(file_path):
    """Read JSON or NDJSON file into DataFrame"""
    try:
        return pd.read_json(file_path, lines=True)
    except ValueError:
        with open(file_path, "r", encoding="utf-8-sig") as f:
            data = json.load(f)
        if isinstance(data, dict):
            data = [data]
        return pd.json_normalize(data)

def parquet_EL(file_path):
    """Read Parquet data"""
    df = pd.read_parquet(file_path, engine="pyarrow")
    return df


def detect_json_type(filepath):
    """Detect GeoJSON vs generic JSON by inspecting the 'type' field."""
    with open(filepath, "r", encoding="utf-8") as f:
        data = json.load(f)
    if isinstance(data, dict) and "type" in data:
        geo_types = {"FeatureCollection", "Feature", "Point", "Polygon", "MultiPolygon", "LineString"}
        if data["type"] in geo_types:
            return geojson_EL(filepath)
    return json_EL(filepath)


# ---------- enhanced Shapefile reader (zip without extraction) ----------

def shapefile_EL(zip_file_path: str):
    """
    Read a Shapefile directly from a .zip (no extraction) using GDAL /vsizip/.
    If given a .shp path, read directly.
    """
    p = Path(zip_file_path).resolve()
    if p.suffix.lower() == ".shp":
        return gpd.read_file(p.as_posix())

    if p.suffix.lower() != ".zip":
        raise ValueError("shapefile_EL expects a .zip (or .shp) path.")

    with zipfile.ZipFile(p, "r") as zf:
        shp_inside = _first_member_with_suffix(zf, ".shp")
        if not shp_inside:
            raise FileNotFoundError("No .shp file found inside the zip.")
    vsip = _vsizip_path(p, shp_inside)
    return gpd.read_file(vsip)


# ---------- NEW: FlatGeobuf (.fgb) ----------

def fgb_EL(path: str):
    """
    Read a FlatGeobuf (.fgb). If a .zip is provided, find the first .fgb inside and use /vsizip/.
    """
    p = Path(path).resolve()
    if p.suffix.lower() == ".fgb":
        return gpd.read_file(p.as_posix())

    if p.suffix.lower() == ".zip":
        with zipfile.ZipFile(p, "r") as zf:
            fgb_inside = _first_member_with_suffix(zf, ".fgb")
            if not fgb_inside:
                raise FileNotFoundError("No .fgb file found inside the zip.")
        vsip = _vsizip_path(p, fgb_inside)
        return gpd.read_file(vsip)

    raise ValueError("fgb_EL expects a .fgb or .zip path.")


# ---------- NEW: KML / KMZ (multi-layer support) ----------

def _list_layers(pathlike: str) -> list[str]:
    """List layers for a datasource using pyogrio first, then Fiona as fallback."""
    try:
        from pyogrio import list_layers  # preferred when available
        return [l[0] for l in list_layers(pathlike)]
    except Exception:
        try:
            import fiona
            return fiona.listlayers(pathlike)
        except Exception as e:
            raise RuntimeError(f"Cannot list layers from: {pathlike}. Error: {e}")

def _concat_layers(path_like: str, layers: list[str]) -> gpd.GeoDataFrame:
    """Read multiple layers and concatenate, tagging the source layer into 'src_layer'."""
    frames = []
    for lyr in layers:
        sub = gpd.read_file(path_like, layer=lyr)
        sub["src_layer"] = lyr
        frames.append(sub)
    if not frames:
        raise ValueError("No readable layers found.")
    return pd.concat(frames, ignore_index=True)

def kml_EL(path: str, layer: str | None = None):
    """
    Read a KML or KMZ. For KMZ, the first .kml inside is used via /vsizip/.
    If 'layer' is None, read all available layers and concatenate.
    """
    p = Path(path).resolve()

    # .kml straight
    if p.suffix.lower() == ".kml":
        layers = _list_layers(p.as_posix())
        if layer is None:
            return _concat_layers(p.as_posix(), layers)
        if layer not in layers:
            raise ValueError(f"Layer '{layer}' not found. Available: {layers}")
        return gpd.read_file(p.as_posix(), layer=layer)

    # .kmz: find inner .kml
    if p.suffix.lower() == ".kmz":
        with zipfile.ZipFile(p, "r") as zf:
            kml_inside = _first_member_with_suffix(zf, ".kml")
            if not kml_inside:
                raise FileNotFoundError("No .kml file found inside the KMZ.")
        vsip = _vsizip_path(p, kml_inside)
        layers = _list_layers(vsip)
        if layer is None:
            return _concat_layers(vsip, layers)
        if layer not in layers:
            raise ValueError(f"Layer '{layer}' not found. Available: {layers}")
        return gpd.read_file(vsip, layer=layer)

    raise ValueError("kml_EL expects a .kml or .kmz path.")


# ---------- NEW: GPX (multi-layer support) ----------

def gpx_EL(path: str, layer: str | None = None):
    """
    Read a GPX file. Common layers: 'waypoints', 'routes', 'tracks',
    'route_points', 'track_points'. If 'layer' is None, read a practical subset
    (tracks/routes/waypoints if present) or all layers as a fallback.
    Supports .zip containing a single .gpx via /vsizip/.
    """
    p = Path(path).resolve()

    def _read_from_pathlike(pathlike: str) -> gpd.GeoDataFrame:
        layers = _list_layers(pathlike)
        if layer is None:
            wanted = [l for l in ("tracks", "routes", "waypoints") if l in layers]
            if not wanted:
                wanted = layers
            return _concat_layers(pathlike, wanted)
        if layer not in layers:
            raise ValueError(f"Layer '{layer}' not found. Available: {layers}")
        return gpd.read_file(pathlike, layer=layer)

    if p.suffix.lower() == ".gpx":
        return _read_from_pathlike(p.as_posix())

    if p.suffix.lower() == ".zip":
        with zipfile.ZipFile(p, "r") as zf:
            gpx_inside = _first_member_with_suffix(zf, ".gpx")
            if not gpx_inside:
                raise FileNotFoundError("No .gpx file found inside the zip.")
        vsip = _vsizip_path(p, gpx_inside)
        return _read_from_pathlike(vsip)

    raise ValueError("gpx_EL expects a .gpx or .zip path.")


# ---------- OPTIONAL: a unified zip geodata reader ----------

def zip_geodata_EL(zip_path: str):
    """
    Read geodata from a .zip by detecting the inner file type.
    Priority: Shapefile (.shp) > FlatGeobuf (.fgb) > KML (.kml) > GPX (.gpx).
    """
    p = Path(zip_path).resolve()
    if p.suffix.lower() != ".zip":
        raise ValueError("zip_geodata_EL expects a .zip path.")

    with zipfile.ZipFile(p, "r") as zf:
        # Try SHP
        inner = _first_member_with_suffix(zf, ".shp")
        if inner:
            return gpd.read_file(_vsizip_path(p, inner))
        # Try FGB
        inner = _first_member_with_suffix(zf, ".fgb")
        if inner:
            return gpd.read_file(_vsizip_path(p, inner))
        # Try KML (KMZ scenario)
        inner = _first_member_with_suffix(zf, ".kml")
        if inner:
            kml_vsip = _vsizip_path(p, inner)
            layers = _list_layers(kml_vsip)
            return _concat_layers(kml_vsip, layers)
        # Try GPX
        inner = _first_member_with_suffix(zf, ".gpx")
        if inner:
            gpx_vsip = _vsizip_path(p, inner)
            layers = _list_layers(gpx_vsip)
            # default subset for GPX
            wanted = [l for l in ("tracks", "routes", "waypoints") if l in layers] or layers
            return _concat_layers(gpx_vsip, wanted)

    raise FileNotFoundError("No supported geodata found inside the zip (.shp/.fgb/.kml/.gpx).")

# mapping: extension → handler function
dict_EL = {
    '.xlsx':    excel_EL,
    '.xls':     excel_EL,
    '.csv':     csv_EL,
    '.tsv':     csv_EL,
    '.parquet': parquet_EL,
    '.geojson': geojson_EL,
    '.json':    detect_json_type,
    '.shp':     shapefile_EL,      # direct shapefile path
    '.zip':     zip_geodata_EL,    # unified zip reader (SHP/FGB/KML/GPX)
    '.kml':     kml_EL,
    '.kmz':     kml_EL,
    '.fgb':     fgb_EL,
    '.gpx':     gpx_EL,
}

def get_df(path):
    ext = os.path.splitext(path)[1].lower()
    if ext in dict_EL:
        df = dict_EL[ext](path)
        return df
    else:
        return None

In [5]:
DF_SPATIAL_HIER = csv_EL(r"C:\Users\ADMrechbay20\OneDrive\桌面\Données\carte\georef-france-iris.csv")
DF_SPATIAL_HIER.columns

Index(['Geo Point', 'Geo Shape', 'Année', 'Code Officiel Région',
       'Nom Officiel Région', 'Code Officiel Département',
       'Nom Officiel Département',
       'Code Officiel Arrondissement départemental',
       'Nom Officiel Arrondissement départemental',
       'Code Officiel Zone emploi 2020', 'Nom Officiel Zone emploi 2020',
       'Code Officiel Bassin vie 2022', 'Nom Officiel Bassin vie 2022',
       'Code Officiel EPCI', 'Nom Officiel EPCI', 'Code Officiel EPT',
       'Nom Officiel EPT', 'Code Officiel Commune', 'Nom Officiel Commune',
       'Code Officiel Commune / Arrondissement Municipal',
       'Nom Officiel Commune / Arrondissement Municipal', 'Code Officiel IRIS',
       'Nom Officiel IRIS', 'Nom Officiel IRIS Majuscule',
       'Nom Officiel IRIS Minuscule', 'Code Iso 3166-3 Zone', 'Type',
       'Code Grand Quartier', 'Libellé Grand Quartier',
       'Fait Partie d'une CTU', 'Code Officiel Zone emploi 2010',
       'Nom Officiel Zone emploi 2010'],
      dtype

In [6]:
df_hier = csv_EL(r"C:\Users\ADMrechbay20\PycharmProjects\metadata usage\ref\geo_levels.csv")

In [7]:
level_defs = {
    "region": ['Code Officiel Région', 'Nom Officiel Région'],
    # "academie": "../ref/geo_levels/academie.geojson",
    "departement": ['Code Officiel Département', 'Nom Officiel Département'],
    "arrondissement_departemental": ['Code Officiel Arrondissement départemental','Nom Officiel Arrondissement départemental'],
    "epci": ['Code Officiel EPCI', 'Nom Officiel EPCI'],
    #"canton": "../ref/geo_levels/canton.geojson",
    "commune": ['Code Officiel Commune', 'Nom Officiel Commune'],
    "arrondissement_communal": ['Code Officiel Commune / Arrondissement Municipal',
    'Nom Officiel Commune / Arrondissement Municipal'],
    "iris": ['Code Officiel IRIS',
    'Nom Officiel IRIS'],
}

In [56]:
SR1 = {
    "id": "SR1",
    "spatial_scope_level": "region",
    "spatial_scope": ["ile-de-france"],
    "temporal_scope_level": "year",
    "temporal_scope": [
        {"start_time": "2021", "end_time": "2021"}
    ],
    "themes": ["Well-Being > Environment > Domestic environment"],
}

SR2 = {
    "id": "SR2",
    "spatial_scope_level": "country",
    "spatial_scope": ["france"],
    "temporal_scope_level": "month",
    "temporal_scope": [
        {"start_time": "2022-01", "end_time": "2022-06"}
    ],
    "themes": ["Well-Being > Environment > Health care and social care > Accessibility"],
}

SR3 = {
    "id": "SR3",
    "spatial_scope_level": "departement",
    "spatial_scope": ["landes"],
    "temporal_scope_level": "year",
    "temporal_scope": [
        {"start_time": "2020", "end_time": "2022"}
    ],
    "themes": ["Well-Being > Level of Independence > Income and Wealth"],
}

CR1 = {
    "id": "CR1",
    "spatial_scope_level": "region",
    "spatial_scope": ["ile-de-france"],
    "temporal_scope_level": "year",
    "temporal_scope": [
        {"start_time": "2018", "end_time": "2020"},
        {"start_time": "2022", "end_time": "2024"},
    ],
    "themes": [
        "Well-Being > Environment > Domestic environment",
        "Well-Being > Level of Independence > Income and Wealth",
    ],
}

CR2 = {
    "id": "CR2",
    "spatial_scope_level": "departement",
    "spatial_scope": ["landes", "gironde"],
    "temporal_scope_level": "year",
    "temporal_scope": [
        {"start_time": "2022","end_time": "2024"}
    ],
    "themes": [
        "Well-Being > Environment > Health care and social care > Accessibility",
    ],
}


SRs = [SR1, SR2, SR3]

In [9]:
def load_cata(path: str) -> List[dict]:
    with open(path, "r", encoding="utf-8") as f:
        data = json.load(f)
    return data["datasets"]

DATASETS = load_cata(CATALOG)

In [10]:
ds_test = DATASETS[12]
ds_test.keys()

dict_keys(['id', 'title', 'sourcePath', 'sourceMtime', 'metadataPath', 'status', 'fileType', 'dataFormat', 'updateFrequency', 'theme', 'spatialGranularity', 'temporalGranularity', 'spatialScope', 'temporalScope', 'fileSizeBytes', 'fileSizeHuman', 'nRows', 'nCols', 'nRecords', 'nFeatures', 'uncompressedSizeBytes', 'checksum', 'generatedAt', 'version'])

In [11]:
def norm(s: str) -> str:
    if s is None:
        return ""
    s = str(s).strip().lower()
    # 去重音，规避 Île / Ile 等差异；不需要可删
    s = unicodedata.normalize("NFKD", s)
    s = "".join(c for c in s if not unicodedata.combining(c))
    return s

def get_r_scopes(
        hier_df, 
        r_spatial_scope_level, 
        r_spatial_scope, 
        ds_spatial_scope_level,
):
    r_code, r_name = level_defs[r_spatial_scope_level]
    ds_code, ds_name = level_defs[ds_spatial_scope_level]
    
    df_h = hier_df.copy()
    cols_need = list(dict.fromkeys([r_code, r_name, ds_code, ds_name]))
    df_h = df_h[cols_need]

    # 1) 归一化要匹配的列表
    scope_norm = set(norm(v) for v in r_spatial_scope)

    # 2) 两列分别归一化后匹配
    code_norm = df_h[r_code].map(norm)
    name_norm = df_h[r_name].map(norm)

    mask = code_norm.isin(scope_norm) | name_norm.isin(scope_norm)

    df_result = df_h.loc[mask, [ds_code, ds_name]].drop_duplicates()
    
    return df_result[ds_code].tolist()+df_result[ds_name].tolist()

In [12]:
result = get_r_scopes(DF_SPATIAL_HIER,CR2["spatial_scope_level"],CR2["spatial_scope"],ds_test["spatialScope"][0]["spatialScopeLevel"])
result

['333',
 '331',
 '402',
 '401',
 '335',
 '332',
 '334',
 '336',
 'Langon',
 'Blaye',
 'Mont-de-Marsan',
 'Dax',
 'Libourne',
 'Bordeaux',
 'Lesparre-Médoc',
 'Arcachon']

In [13]:
def get_ds_scopes(ds_spatilScope):
    scopes = []
    if len(ds_spatilScope)>1:
        for item in ds_spatilScope:
            scopes += item["spatialScope"]
    else:
        scopes = ds_spatilScope[0]["spatialScope"]
        
    level = ds_spatilScope[0]["spatialScopeLevel"]
    return level, scopes

In [14]:
# ---------------------- 1) 索引构建（一次即可） ----------------------
def build_geo_index(GEO_DATA: Dict[str, gpd.GeoDataFrame]):
    """
    返回:
      { level: {
          'by_code': {code:[geom,...]},
          'by_name': {name:[geom,...]},
          'all_union': <该层级所有几何的并集>
      } }
    """
    index = {}
    for level, gdf in GEO_DATA.items():
        lvl = level.lower()
        if gdf.crs is None:
            gdf = gdf.set_crs("EPSG:4326", allow_override=True)

        required = {"code", "name", "geometry"}
        missing = required - set(gdf.columns)
        if missing:
            raise ValueError(f"GEO_DATA[{level}] 缺少列: {missing}")

        by_code, by_name = {}, {}
        for _, row in gdf[["code", "name", "geometry"]].iterrows():
            c = norm(row["code"])
            n = norm(row["name"])
            if c:
                by_code.setdefault(c, []).append(row.geometry)
            if n:
                by_name.setdefault(n, []).append(row.geometry)

        index[lvl] = {
            "by_code": by_code,
            "by_name": by_name,
            "all_union": unary_union(gdf.geometry)  # ← 新增
        }
    return index

# ---------------------- 2) 由 (level, values) 拿到几何 ----------------------
def geoms_for(level: str, values: List[str], GEO_INDEX) -> List:
    lvl = (level or "").lower()
    if not values or lvl not in GEO_INDEX:
        return []
    store = GEO_INDEX[lvl]
    out = []
    for v in values:
        k = norm(v)
        # 先尝试 code，再尝试 name（你也可以反过来）
        if k in store["by_code"]:
            out.extend(store["by_code"][k])
        elif k in store["by_name"]:
            out.extend(store["by_name"][k])
        # 匹配不到就跳过（也可选择 raise）
    return out

# ---------------------- 3) 从 catalog.spatialScope 生成 footprint ----------------------
def dataset_geom_from_spatial_scope(ds: dict, GEO_INDEX) -> Optional["BaseGeometry"]:
    """
    ds["spatialScope"] 形如:
    [
      {"spatialScopeLevel":"region",    "spatialScope":["84","11","ile-de-france"]},
      {"spatialScopeLevel":"department","spatialScope":["40","landes"]}
    ]
    - 每个块内：列举的单位取并集（OR）
    - 所有块之间：也做并集（OR）
    """
    scopes = ds.get("spatialScope") or []
    parts = []
    for blk in scopes:
        lvl = blk.get("spatialScopeLevel")
        vals = blk.get("spatialScope") or []
        gs = geoms_for(lvl, vals, GEO_INDEX)
        if gs:
            parts.append(unary_union(gs))
    if not parts:
        return None
    return unary_union(parts)

# ---------------------- 4) SR 定义与层级“下钻”规则 ----------------------

def resolve_required_level(sr: dict) -> str:
    lvl = (sr["spatial_scope_level"] or "").lower()
    names = {norm(x) for x in (sr["spatial_scope"] or [])}
    # 你的规则：country=france 时用 region 粒度
    if lvl == "country" and ("france" in names or "fr" in names or "fra" in names):
        return "region"
    return lvl

def _is_france(values: List[str]) -> bool:
    wanted = {norm(v) for v in (values or [])}
    return bool(wanted & {"france", "fr", "fra"})

def target_geom_from_sr(sr: dict, GEO_INDEX, use_bbox=False):
    lvl = resolve_required_level(sr)  # country(france) → 'region'
    vals = sr.get("spatial_scope") or []

    # 常规：按值取几何
    gs = geoms_for(lvl, vals, GEO_INDEX)

    # 兜底：country=france 时，取全体 region 的并集
    if not gs:
        orig_lvl = (sr.get("spatial_scope_level") or "").lower()
        if orig_lvl == "country" and _is_france(vals) and lvl == "region":
            store = GEO_INDEX.get("region", {})
            tgt = store.get("all_union")
            # 兼容旧索引（没有 all_union 时临时聚合）
            if tgt is None:
                all_geoms = []
                for lst in store.get("by_code", {}).values():
                    all_geoms.extend(lst)
                if not all_geoms:
                    for lst in store.get("by_name", {}).values():
                        all_geoms.extend(lst)
                tgt = unary_union(all_geoms) if all_geoms else None
            if tgt is None:
                return None
            if use_bbox:
                minx, miny, maxx, maxy = tgt.bounds
                return box(minx, miny, maxx, maxy)
            return tgt

    if not gs:
        return None

    tgt = unary_union(gs)
    if use_bbox:
        minx, miny, maxx, maxy = tgt.bounds
        tgt = box(minx, miny, maxx, maxy)
    return tgt


In [15]:
GEO_INDEX = build_geo_index(GEO_DATA) 
GEO_INDEX

{'region': {'by_code': {'44': [<POLYGON ((3.84 48.036, 3.84 48.036, 3.84 48.036, 3.839 48.036, 3.837 48.037...>],
   '93': [<MULTIPOLYGON (((4.881 43.357, 4.881 43.357, 4.881 43.357, 4.881 43.357, 4.8...>],
   '84': [<POLYGON ((2.213 44.637, 2.213 44.637, 2.213 44.637, 2.213 44.637, 2.212 44....>],
   '75': [<MULTIPOLYGON (((-1.441 43.046, -1.447 43.053, -1.449 43.055, -1.452 43.059,...>],
   '24': [<POLYGON ((0.387 46.942, 0.387 46.942, 0.386 46.943, 0.385 46.943, 0.385 46....>],
   '53': [<MULTIPOLYGON (((-4.837 48.032, -4.837 48.032, -4.837 48.032, -4.837 48.032,...>],
   '94': [<MULTIPOLYGON (((8.824 41.71, 8.824 41.71, 8.823 41.711, 8.823 41.711, 8.822...>],
   '76': [<MULTIPOLYGON (((0.136 42.722, 0.136 42.722, 0.136 42.722, 0.136 42.722, 0.1...>],
   '27': [<POLYGON ((3.228 46.693, 3.228 46.693, 3.228 46.692, 3.227 46.692, 3.227 46....>],
   '32': [<POLYGON ((1.767 49.18, 1.767 49.18, 1.766 49.18, 1.765 49.179, 1.763 49.178...>],
   '52': [<MULTIPOLYGON (((-2.369 46.7, -2.369 46

In [16]:
def codes_names_intersecting(
    tgt_geom,
    level,
    code_col: str = "code",
    name_col: str = "name",
) -> List[str]:
    """
    在 geojson_path 中，找出与 tgt_geom 有交集的要素，返回仅含 code/name 的 DataFrame（去重）。
    要求：tgt_geom 与该 GeoJSON 使用同一坐标系（你之前都是 EPSG:4326）。
    """
    gdf = GEO_DATA[level].copy()

    # 先用空间索引做候选集合，再做精确 intersects
    try:
        idx = list(gdf.sindex.query(tgt_geom, predicate="intersects"))
        cand = gdf.iloc[idx]
    except Exception:
        # 没有 sindex 时退化为全表测试
        cand = gdf

    mask = cand.intersects(tgt_geom)
    out = cand.loc[mask, [code_col, name_col]].drop_duplicates().reset_index(drop=True)
    
    return out[code_col].tolist()+out[name_col].tolist()

In [17]:
def match_spatial_by_geo(
    ds: dict,
    sr: dict,
    hier:bool,
    level:str,
    relation: str = "intersects",  # 可选: 'intersects' | 'within' | 'contains' | 'covers' | 'covered_by'
    fallback_to_names: bool = True
):
    """
    首选：从 catalog.spatialScope 得到 ds 的 footprint，与 SR 的目标几何做关系判断。
    若 footprint 生成失败且允许回退：按名称集合做简单匹配（同层级 AND，层级内 OR）。
    """
    ds_geom = dataset_geom_from_spatial_scope(ds, GEO_INDEX)
    tgt_geom = target_geom_from_sr(sr, GEO_INDEX, use_bbox=False)
    
    match = False
    
    if ds_geom is not None and tgt_geom is not None:
        if relation == "intersects":
            match = ds_geom.intersects(tgt_geom)
        elif relation == "within":
            match = ds_geom.within(tgt_geom)
        elif relation == "contains":
            match = ds_geom.contains(tgt_geom)
        elif relation == "covers":
            match = ds_geom.covers(tgt_geom)
        elif relation == "covered_by":
            match = ds_geom.covered_by(tgt_geom)
        else:
            raise ValueError(f"未知关系: {relation}")
        
        if match and hier:
            return codes_names_intersecting(tgt_geom,level)
        else: 
            return None

    if not fallback_to_names:
        return None


def match_spatial_cata(ds_cata, sr): 
    r_spatial_level = sr["spatial_scope_level"]
    ds_spatial_level, ds_spatial_scope = get_ds_scopes(ds_cata["spatialScope"])
    
    if r_spatial_level in level_defs.keys() and ds_spatial_level in level_defs.keys():
        r_scope = get_r_scopes(DF_SPATIAL_HIER, r_spatial_level, sr["spatial_scope"], ds_spatial_level)
        return list(set(r_scope) & set(ds_spatial_scope))
    else:
        try: 
            return match_spatial_by_geo(ds_cata, sr, ds_spatial_level in level_defs.keys(), ds_spatial_level)
        except ValueError:
            return None
    return None

In [18]:
match_spatial_cata(ds_test,SR1)

['Créteil',
 'Fontainebleau',
 'Versailles',
 '952',
 '931',
 'Étampes',
 'Bobigny',
 '784',
 '912',
 'Sarcelles',
 '782',
 '771',
 'Mantes-la-Jolie',
 'Provins',
 'Pontoise',
 '911',
 'Nanterre',
 'Meaux',
 'Argenteuil',
 '932',
 'Évry',
 '773',
 '913',
 '781',
 '951',
 '774',
 '751',
 '772',
 '943',
 '921',
 'Paris',
 'Le Raincy',
 '941',
 'Saint-Denis',
 'Nogent-sur-Marne',
 'Saint-Germain-en-Laye',
 '933',
 '953',
 'Boulogne-Billancourt',
 '783',
 'Torcy',
 'Rambouillet',
 '922',
 '775',
 '923',
 'Palaiseau',
 "L'Haÿ-les-Roses",
 'Antony',
 'Melun',
 '942']

In [19]:
def split_theme_path(p: str):
    """
    将 'A > B > C' 拆成 ['a','b','c']，自动去空格、大小写及重音。
    """
    return [norm(x) for x in p.split(">") if norm(x)]

def is_equal_or_descendant(ds_tokens, req_tokens) -> bool:
    """
    规则：dataset 路径与所需路径相同，或 dataset 在所需路径之下（更长且前缀一致）。
    例：
      req: ['well-being','level of independence']
      ds:  ['well-being','level of independence','education and skills'] -> True
      ds:  ['well-being'] -> False（父级，不算子集）
    """
    if len(ds_tokens) < len(req_tokens):
        return False
    return ds_tokens[:len(req_tokens)] == req_tokens

def match_theme_cata(ds_themes: List[str], wanted_paths: List[str]) -> List[str]:
    """
    返回命中的 ds 主题路径列表（字符串），保留原顺序并去重。
    无命中返回空列表 []。
    """
    if not wanted_paths or not ds_themes:
        return []

    wanted_tokens_list = [split_theme_path(p) for p in wanted_paths]
    hits = []
    seen = set()
    for ds_path in ds_themes:
        if not isinstance(ds_path, str) or not ds_path.strip():
            continue
        ds_tokens = split_theme_path(ds_path)
        for req_tokens in wanted_tokens_list:
            if is_equal_or_descendant(ds_tokens, req_tokens):
                if ds_path not in seen:
                    seen.add(ds_path)
                    hits.append(ds_path)
                break
    return hits

In [20]:
match_theme_cata(ds_test["theme"], SR1["themes"])

['Well-Being > Environment > Domestic environment > Available equipment',
 'Well-Being > Environment > Domestic environment > Available space']

In [21]:
def parse_time(s: str) -> date:
    """
    解析以下时间字符串并返回该粒度的起始日期：
      - 年:           2024                         -> 2024-01-01
      - 季度:         2024-Q1 / Q1 2024 / 2024T1   -> 2024-01-01, 04-01, 07-01, 10-01
      - 学期/半年:    2024-S1 / H2 2024 / 2024H1   -> 2024-01-01 或 2024-07-01
      - 月:           2024-03 / 2024/3             -> 2024-03-01
      - 周(ISO):      2024-W09 / 2024W9 / W9 2024  -> 该周周一
      - 日期:         2024-03-15 / 2024/3/15       -> 2024-03-15
    解析失败时抛 ValueError。
    """
    if s is None:
        raise ValueError("时间字符串为空")
    s = str(s).strip()

    # 1) 完整日期 YYYY-MM-DD 或 YYYY/M/D
    m = re.fullmatch(r"(?i)\s*(?P<y>\d{4})[-/](?P<m>\d{1,2})[-/](?P<d>\d{1,2})\s*", s)
    if m:
        y, m_, d_ = map(int, m.groups())
        return date(y, m_, d_)

    # 2) ISO 周：YYYY-Www / YYYYWww / Wwww YYYY
    for rx in (
        r"^\s*(?P<y>\d{4})[-/]?W(?P<w>\d{1,2})\s*$",
        r"^\s*W(?P<w>\d{1,2})[-/\s]?(?P<y>\d{4})\s*$",
    ):
        m = re.fullmatch(rx, s, flags=re.I)
        if m:
            y = int(m.group("y")); w = int(m.group("w"))
            # fromisocalendar 会校验周号是否合法（如 53 周年份）
            return date.fromisocalendar(y, w, 1)  # 周一

    # 3) 月：YYYY-MM 或 YYYY/M
    m = re.fullmatch(r"^\s*(?P<y>\d{4})[-/](?P<m>\d{1,2})\s*$", s)
    if m:
        y = int(m.group("y")); m_ = int(m.group("m"))
        if not (1 <= m_ <= 12):
            raise ValueError(f"月份不合法: {m_}")
        return date(y, m_, 1)

    # 4) 季度：YYYY-Q1 / Q2 YYYY / 2024T3 / T4 2024
    for rx in (
        r"^\s*(?P<y>\d{4})[-/]?[QT](?P<q>[1-4])\s*$",
        r"^\s*[QT](?P<q>[1-4])[-/\s]?(?P<y>\d{4})\s*$",
    ):
        m = re.fullmatch(rx, s, flags=re.I)
        if m:
            y = int(m.group("y")); q = int(m.group("q"))
            start_month = (q - 1) * 3 + 1
            return date(y, start_month, 1)

    # 5) 学期/半年：YYYY-S1/H2 / S1 2024 / H2 2024
    for rx in (
        r"^\s*(?P<y>\d{4})[-/]?[SH](?P<s>[12])\s*$",
        r"^\s*[SH](?P<s>[12])[-/\s]?(?P<y>\d{4})\s*$",
    ):
        m = re.fullmatch(rx, s, flags=re.I)
        if m:
            y = int(m.group("y")); s_ = int(m.group("s"))
            start_month = 1 if s_ == 1 else 7
            return date(y, start_month, 1)

    # 6) 年：YYYY（18xx/19xx/20xx）
    m = re.fullmatch(r"^\s*(?P<y>(18|19|20)\d{2})\s*$", s)
    if m:
        return date(int(m.group("y")), 1, 1)

    raise ValueError(f"无法解析时间字符串: {s!r}")

def overlap(a: Tuple[date, date], b: Tuple[date, date]) -> bool:
    (a0, a1), (b0, b1) = a, b
    return not (a1 < b0 or b1 < a0)

def parse_bounds(s: str) -> Tuple[date, date]:
    """
    把单个时间令牌解析为 (start_date, end_date)：
    - 年:       2024           -> (2024-01-01, 2024-12-31)
    - 季度:     2024-Q1        -> (2024-01-01, 2024-03-31)
    - 学期:     2024-S2/H2     -> (2024-07-01, 2024-12-31)
    - 月:       2024-03        -> (2024-03-01, 2024-03-31)
    - 周(ISO):  2024-W09       -> (该周周一, 该周周日)
    - 日期:     2024-03-15     -> (2024-03-15, 2024-03-15)
    """
    if s is None:
        raise ValueError("时间字符串为空")
    s = str(s).strip()

    # 日期
    m = re.fullmatch(r"(?i)\s*(?P<y>\d{4})[-/](?P<m>\d{1,2})[-/](?P<d>\d{1,2})\s*", s)
    if m:
        y, m_, d_ = map(int, m.groups())
        dt = date(y, m_, d_)
        return dt, dt

    # ISO 周
    for rx in (
        r"^\s*(?P<y>\d{4})[-/]?W(?P<w>\d{1,2})\s*$",
        r"^\s*W(?P<w>\d{1,2})[-/\s]?(?P<y>\d{4})\s*$",
    ):
        m = re.fullmatch(rx, s, flags=re.I)
        if m:
            y = int(m.group("y")); w = int(m.group("w"))
            st = date.fromisocalendar(y, w, 1)  # 周一
            ed = date.fromisocalendar(y, w, 7)  # 周日
            return st, ed

    # 月
    m = re.fullmatch(r"^\s*(?P<y>\d{4})[-/](?P<m>\d{1,2})\s*$", s)
    if m:
        y = int(m.group("y")); m_ = int(m.group("m"))
        if not (1 <= m_ <= 12):
            raise ValueError(f"月份不合法: {m_}")
        st = date(y, m_, 1)
        ed = date(y, m_, calendar.monthrange(y, m_)[1])
        return st, ed

    # 季度
    for rx in (
        r"^\s*(?P<y>\d{4})[-/]?[QT](?P<q>[1-4])\s*$",
        r"^\s*[QT](?P<q>[1-4])[-/\s]?(?P<y>\d{4})\s*$",
    ):
        m = re.fullmatch(rx, s, flags=re.I)
        if m:
            y = int(m.group("y")); q = int(m.group("q"))
            sm = (q - 1) * 3 + 1            # start month
            em = sm + 2                     # end month
            st = date(y, sm, 1)
            ed = date(y, em, calendar.monthrange(y, em)[1])
            return st, ed

    # 学期 / 半年
    for rx in (
        r"^\s*(?P<y>\d{4})[-/]?[SH](?P<s>[12])\s*$",
        r"^\s*[SH](?P<s>[12])[-/\s]?(?P<y>\d{4})\s*$",
    ):
        m = re.fullmatch(rx, s, flags=re.I)
        if m:
            y = int(m.group("y")); s_ = int(m.group("s"))
            sm, em = (1, 6) if s_ == 1 else (7, 12)
            st = date(y, sm, 1)
            ed = date(y, em, calendar.monthrange(y, em)[1])
            return st, ed

    # 年
    m = re.fullmatch(r"^\s*(?P<y>(18|19|20)\d{2})\s*$", s)
    if m:
        y = int(m.group("y"))
        return date(y, 1, 1), date(y, 12, 31)

    raise ValueError(f"无法解析时间字符串: {s!r}")

# --- 修改：wanted 用 start 的起始日 + end 的最后一天 ---
def _parse_wanted(wanted: Optional[Dict[str, str]]) -> Tuple[date, date]:
    if not wanted:
        raise ValueError("wanted 为空")
    s = wanted.get("start_time") or wanted.get("start") or wanted.get("startTime")
    e = wanted.get("end_time")   or wanted.get("end")   or wanted.get("endTime")
    if s is None or e is None:
        raise ValueError(f"wanted 缺少 start/end: {wanted}")
    w0, _ = parse_bounds(s)   # 取起始
    _,  w1 = parse_bounds(e)  # 取结束
    if w1 < w0:
        w0, w1 = w1, w0
    return w0, w1

# --- 修改：数据集时间同理，start 用起始日，end 用最后一天 ---
def periods_from_temporal_scope(ds: dict) -> List[Tuple[date, date, str]]:
    out = []
    scopes = ds.get("temporalScope") or []
    for block in scopes:
        level = (block.get("temporalScopeLevel") or "").lower()
        for p in block.get("timePeriods") or []:
            st, _ = parse_bounds(p["startTime"])
            _, ed = parse_bounds(p["endTime"])
            if ed < st:
                st, ed = ed, st
            out.append((st, ed, level))
    return out

def _intersection(a0: date, a1: date, b0: date, b1: date) -> Optional[Tuple[date, date]]:
    i0, i1 = max(a0, b0), min(a1, b1)
    return (i0, i1) if i0 <= i1 else None

def match_temporal_cata(ds: dict, wanteds: List[Dict[str, str]]) -> List[Tuple[date, date]]:
    """
    ds:    含 temporalScope 的数据集元数据
    wanteds: [{'start_time'/'start', 'end_time'/'end'}, ...]
    返回：所有重叠区间 [(start_date, end_date), ...]，已去重保序
    """
    # 1) 数据集时间段
    ds_periods_raw = periods_from_temporal_scope(ds)  # [(p0, p1, level), ...]
    ds_periods = []
    for p0, p1, _ in ds_periods_raw:
        if p1 < p0:  # 保险
            p0, p1 = p1, p0
        ds_periods.append((p0, p1))
    if not ds_periods:
        return []

    # 2) 规范 wanted 列表
    if not wanteds:
        return []
    wanteds = [w for w in wanteds if w]  # 去掉 None/空

    # 3) 计算交集并去重保序
    overlaps: List[Tuple[date, date]] = []
    seen = set()
    for w in wanteds:
        w0, w1 = _parse_wanted(w)  # 用你前面实现的：start 用起始日，end 用最后一天
        if w1 < w0:
            w0, w1 = w1, w0
        for p0, p1 in ds_periods:
            i0, i1 = _intersection(w0, w1, p0, p1) or (None, None)
            if i0 is not None:
                inter = (i0, i1)
                if inter not in seen:
                    seen.add(inter)
                    overlaps.append(inter)

    return overlaps

In [22]:
match_temporal_cata(ds_test, SR1["temporal_scope"]) 

[(datetime.date(2022, 1, 1), datetime.date(2022, 12, 31))]

In [23]:
def load_json(path: str) -> List[dict]:
    with open(path, "r", encoding="utf-8") as f:
        data = json.load(f)
    return data

def flatten_all(seq):
    for item in seq:
        if isinstance(item, (list, tuple)):
            yield from flatten_all(item)
        else:
            yield item

In [24]:
def match_theme_meta(theme: Optional[dict], wanted_themes: List[str]) -> bool:
    """
    判断一个 theme 字典是否命中要求主题（OR 关系）。
    命中规则：theme 的路径 与 任一 wanted 路径“相同或位于其下层（子路径）”。

    参数：
      theme: 带主题信息的字典（来自 attribute.theme）
             期望键优先顺序：themeName > name > path
      wanted_themes: 主题路径列表（如：["Well-Being > Level of Independence > Jobs and Employment"]）

    依赖：
      - split_theme_path(p: str) -> List[str]
      - is_equal_or_descendant(ds_tokens: List[str], req_tokens: List[str]) -> bool
    """
    # 无要求主题则视为命中
    if not wanted_themes:
        return True

    # 兼容空值
    if not isinstance(theme, dict):
        return False

    # 从 theme 字典里拿到路径字符串
    path_str = (theme.get("themeName")
                or theme.get("name")
                or theme.get("path")
                or "").strip()
    if not path_str:
        return False

    ind_tokens = split_theme_path(path_str)
    wanted_tokens_list = [split_theme_path(p) for p in wanted_themes]

    for req_tokens in wanted_tokens_list:
        if is_equal_or_descendant(ind_tokens, req_tokens):
            return True
    return False

def filter_attributes(meta, theme_match, spatial_level, temporal_level) -> List[str]:

    atts=[]
    
    spatial_filters = []
    temporal_filters = []

    attributes = meta['attributes']

    for att in attributes:
        if "spatialLevel" in att.keys():
            atts.append(att["dataName"])
            if att["spatialLevel"] == spatial_level:
                spatial_filters.append(att["dataName"])
        elif "temporalLevel" in att.keys():
            atts.append(att["dataName"])
            if att["temporalLevel"] == temporal_level:
                temporal_filters.append(att["dataName"])
        elif "indicatorType" in att.keys():
            if match_theme_meta(att["theme"],theme_match):
                atts.append(att["dataName"])
        else:
            if att["granularity"] or match_theme_meta(att["theme"],theme_match):
                atts.append(att["dataName"])

    return list(flatten_all(atts)), list(flatten_all(spatial_filters)), list(flatten_all(temporal_filters))

In [25]:
meta = load_json(ds_test["metadataPath"])
theme_match = match_theme_cata(ds_test["theme"], SR1["themes"])
atts, sp_filter, tp_filter = filter_attributes(meta,theme_match, ds_test["spatialScope"][0]["spatialScopeLevel"], ds_test["temporalScope"][0]["temporalScopeLevel"])

In [26]:
atts

['Latitude',
 'Longitude',
 'Arrondissement Code',
 'Arrondissement Nom',
 'Adresse',
 "Date de l'enquête",
 "Date de changement d'état de la fiche d'enquête",
 "Date de création de la fiche d'enquête",
 "Date de l'homologation préfectorale",
 'Année de mise en service',
 'Date des derniers gros travaux',
 "Longueur de l'aire d'évolution",
 "Largeur de l'aire d'évolution",
 "Hauteur de l'aire d'évolution",
 "Surface de l'aire d'évolution",
 'Nombre de couloirs/pistes/postes/jeux/pas',
 'Nombre de places assises en tribune',
 'Nombre de vestiaires sportifs',
 'Nombre de vestiaires arbitres/enseignants',
 'Longueur du bassin',
 'Largeur du bassin',
 'Surface du bassin',
 'Profondeur minimale du bassin',
 'Profondeur maximale du bassin',
 'Longueur de la piste',
 'SAE: hauteur maximale de la structure',
 'SAE: surface totale de la structure',
 'SAE: nombre de couloirs de la structure',
 'Présence de douches',
 'Présence de sanitaires',
 'Types de locaux complémentaires',
 "Types d'aménage

In [27]:
def select_rows_by_temporal(
    df_cols_need: pd.DataFrame,
    tp_filter: List[str],
    temporal_match: List[Tuple[date, date]],
) -> pd.DataFrame:
    periods = list(dict.fromkeys(temporal_match))  # 去重，保序
    if not periods:
        return df_cols_need.iloc[0:0].copy()

    cols = [c for c in tp_filter if c in df_cols_need.columns]
    if not cols:
        return df_cols_need.iloc[0:0].copy()

    def cell_hits(v: Any) -> bool:
        """单元格是否与任一 period 相交。支持单值或 list/tuple。"""
        # 如果单元格是列表/元组，任何一个元素命中即可
        if isinstance(v, (list, tuple, set)):
            for x in v:
                if cell_hits(x):
                    return True
            return False
        if pd.isna(v):
            return False
        # datetime 直接比较（按当天视为点区间）
        if pd.api.types.is_datetime64_any_dtype(type(v)) or isinstance(v, pd.Timestamp):
            d = pd.to_datetime(v).date()
            for a, b in periods:
                if a <= d <= b:
                    return True
            return False
        # 其它字符串/数字：用 parse_bounds 得到 (start,end)
        try:
            s, e = parse_bounds(v)  # 你已实现：返回该 token 的起止日期
        except Exception:
            return False
        for a, b in periods:
            if not (e < a or b < s):  # 相交
                return True
        return False

    mask = pd.Series(False, index=df_cols_need.index)
    for c in cols:
        # 列若是 datetime64，向量化比较更快
        if pd.api.types.is_datetime64_any_dtype(df_cols_need[c]):
            dseries = df_cols_need[c].dt.date
            colmask = pd.Series(False, index=df_cols_need.index)
            for a, b in periods:
                colmask |= ((dseries >= a) & (dseries <= b))
        else:
            colmask = df_cols_need[c].apply(cell_hits)
        mask |= colmask

    return df_cols_need.loc[mask]

In [28]:
def _flatten_once(seq):
    out = []
    for x in seq:
        out.extend(x if isinstance(x, (list, tuple)) else [x])
    return out

def _cell_hits_periods(v, periods):
    """单元格是否与任一 period 相交。支持标量或 list/tuple；利用 parse_bounds。"""
    if isinstance(v, (list, tuple, set)):
        return any(_cell_hits_periods(x, periods) for x in v)
    if pd.isna(v):
        return False
    try:
        s, e = parse_bounds(v)   # 你之前实现过：返回(起始日, 结束日)
    except Exception:
        return False
    return any(not (e < a or b < s) for (a, b) in periods)

In [48]:
def filter_cata(cata_dss, sr):
    select=[]
    for cata_ds in cata_dss:
        if not cata_ds["spatialScope"] or not cata_ds["temporalScope"]:
            continue
        theme_match = match_theme_cata(cata_ds["theme"], sr["themes"])
        if not theme_match:
            continue
        spatial_match = match_spatial_cata(cata_ds, sr)
        if not spatial_match:
            continue
        temporal_match = match_temporal_cata(cata_ds,sr["temporal_scope"])
        if not temporal_match:
            continue
        if spatial_match and temporal_match and theme_match: 
            print(cata_ds["title"]+" is corresponding")
            meta = load_json(cata_ds["metadataPath"])
            atts, sp_fiter, tp_filter = filter_attributes(meta,theme_match, cata_ds["spatialScope"][0]["spatialScopeLevel"], cata_ds["temporalScope"][0]["temporalScopeLevel"])
            raw_file = cata_ds["sourcePath"]
            df_raw = get_df(raw_file)
            
            df_cols_need = df_raw[atts]
            
            allowed = set(norm(x) for x in spatial_match)
            if sp_fiter:
                mask_sp = df_cols_need[sp_fiter].apply(lambda col: col.map(norm)).isin(allowed).any(axis=1)
            else:
                mask_sp = pd.Series(False, index=df_cols_need.index)
                
            mask_tp = pd.Series(False, index=df_cols_need.index)
            for c in tp_filter:
                if pd.api.types.is_datetime64_any_dtype(df_cols_need[c]):
                    d = df_cols_need[c].dt.date
                    colmask = pd.Series(False, index=d.index)
                    for a, b in temporal_match:              # temporal_match: List[Tuple[date,date]]
                        colmask |= (d >= a) & (d <= b)
                else:
                    colmask = df_cols_need[c].apply(lambda v: _cell_hits_periods(v, temporal_match))
                mask_tp |= colmask
            
            mask = mask_sp & mask_tp
            df_selected = df_cols_need.loc[mask]
            select.append(df_selected)
    return select

In [57]:
filter_cata(load_cata(CATALOG),SR1)

fr-en-data-es-base-de-donnees.csv is corresponding


[         Latitude  Longitude Arrondissement Code Arrondissement Nom  \
 33      49.001167   1.682759                 781    Mantes-la-Jolie   
 384     48.809400   2.062034                 784         Versailles   
 7672    48.943832   2.486998                 932          Le Raincy   
 18482         NaN        NaN                 931            Bobigny   
 18868         NaN        NaN                 952          Sarcelles   
 ...           ...        ...                 ...                ...   
 295571  48.822620   2.120358                 784         Versailles   
 296330  48.809400   2.062034                 784         Versailles   
 297928  49.001167   1.682759                 781    Mantes-la-Jolie   
 321531  48.713006   2.205375                 913          Palaiseau   
 325734  48.832951   2.385634                 751              Paris   
 
                             Adresse Date de l'enquête  \
 33               1 Rue Marcel Doret        2025-03-31   
 384       Rue de 

In [50]:
filter_cata(load_cata(CATALOG),SR2)

fr-en-data-es-base-de-donnees.csv is corresponding
rpls2021_geolocalise_OD_REG02.csv is corresponding


[         Latitude  Longitude Arrondissement Code         Arrondissement Nom  \
 10      47.914664   0.333694                 723                    Le Mans   
 31      48.405924   2.689468                 774              Fontainebleau   
 65     -20.896926  55.486692                9741                Saint-Denis   
 127           NaN        NaN                 172                  Rochefort   
 234     48.308267  -0.929000                 533                    Mayenne   
 ...           ...        ...                 ...                        ...   
 332764  47.735752  -2.581017                 563                     Vannes   
 332873        NaN        NaN                 575  Sarrebourg-Château-Salins   
 332982  47.872285  -3.567617                 294                    Quimper   
 333038  48.878970   2.328251                 751                      Paris   
 333039  48.878970   2.328251                 751                      Paris   
 
                                     A

In [51]:
filter_cata(load_cata(CATALOG),SR3)

merged_output.csv is corresponding


[        Latitude   Longitude  Département               Libellé  \
 1153   43,712066  -1,2446846           40  St GEOURS > SOUSTONS   
 1154  43,7120706  -1,2446792           40  SOUSTONS > St GEOURS   
 1155   43,712066  -1,2446846           40  St GEOURS - SOUSTONS   
 1156    43,85629    -1,16649           40  CASTETS VERS MAGESCQ   
 1157    43,85629    -1,16649           40  MAGESCQ VERS CASTETS   
 ...          ...         ...          ...                   ...   
 1769  43,8156092  -0,2963734           40  Ca RD30 > VILLENEUVE   
 1770  43,8156092  -0,2963734           40     VILLENEUVE - RD30   
 1771  44,5056573  -1,0458899           40        33 > SANGUINET   
 1772  44,5056573  -1,0458899           40        SANGUINET > 33   
 1773  44,5056573  -1,0458899           40    lim 33 - SANGUINET   
 
                       Commune  Année     MJA  MJAPL MJAPPL      MJE  ...  \
 1153  SAINT-GEOURS-DE-MAREMNE   2020  2519.0  181.0   7,19   4249.0  ...   
 1154  SAINT-GEOURS-DE-MAREM

In [52]:
filter_cata(load_cata(CATALOG),CR1)

fr-en-data-es-base-de-donnees.csv is corresponding


[         Latitude  Longitude Arrondissement Code Arrondissement Nom  \
 31      48.405924   2.689468                 774      Fontainebleau   
 65     -20.896926  55.486692                9741        Saint-Denis   
 371     49.020720   2.601829                 771              Meaux   
 2198    48.866996   2.237072                 751              Paris   
 2260    48.936168   2.386552                 933        Saint-Denis   
 ...           ...        ...                 ...                ...   
 332411  48.924250   2.355473                 933        Saint-Denis   
 332417 -20.895140  55.464284                9741        Saint-Denis   
 333038  48.878970   2.328251                 751              Paris   
 333039  48.878970   2.328251                 751              Paris   
 333056  49.055491   2.053757                 953           Pontoise   
 
                                    Adresse Date de l'enquête  \
 31              18 Boulevard André Maginot        2025-03-31   
 65 

In [53]:
filter_cata(load_cata(CATALOG),CR2)

fr-en-data-es-base-de-donnees.csv is corresponding


[         Latitude  Longitude Arrondissement Code Arrondissement Nom  \
 9695    44.873921  -0.652313                 332           Bordeaux   
 9856    45.297324  -0.929192                 334     Lesparre-Médoc   
 9876    44.956970  -0.598001                 332           Bordeaux   
 27481         NaN        NaN                 335           Libourne   
 28372         NaN        NaN                 334     Lesparre-Médoc   
 ...           ...        ...                 ...                ...   
 312745  44.853670  -0.647370                 332           Bordeaux   
 314762  44.845006  -0.652860                 332           Bordeaux   
 314955  44.845006  -0.652860                 332           Bordeaux   
 319591  44.875040  -0.674070                 332           Bordeaux   
 331257        NaN        NaN                 333             Langon   
 
                                       Adresse Date de l'enquête  \
 9695                            Rue du Pinsan        2025-07-10  

WITHOUT METADATA

In [35]:
# === 0) 准备：主题树 & OpenAI 客户端 ===
# 你可以把 THEME_FOLDER_STRUCTURE 换成你的完整主题树（根节点必须是 "Well-being"）
from openai import OpenAI

# 若未设置 OPENAI_API_KEY，请在此处设置
# os.environ["OPENAI_API_KEY"] = "sk-..."

client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))

THEME_FOLDER_STRUCTURE = {
    "Well-Being": {
        "Physical Health": {
            "Pain and discomfort": {},
            "Energy and fatigue": {},
            "Sleep and rest": {}
        },
        "Psychological Health": {
            "Positive feelings": {},
            "Thinking, learning, memory and concentration": {
                "Speed": {},
                "Clarity": {}
            },
            "Self-esteem": {},
            "Body image and appearance": {},
            "Negative feelings": {}
        },
        "Level of Independence": {
            "Mobility": {},
            "Activities of daily living": {
                "Taking care of oneself": {},
                "Managing one's belongings appropriately": {}
            },
            "Dependence on medication and medical aids": {},
            "Education and Skills": {
                "Years of schooling": {},
                "Upper secondary attainment": {},
                "Foundational skills": {
                    "Literacy": {},
                    "Numeracy": {},
                    "Digital skills": {}
                },
                "Lifelong learning": {
                    "Adult education": {},
                    "Training opportunities": {}
                }
            },
            "Income and Wealth": {
                "Household income": {},
                "Wealth distribution": {},
                "Income inequality (Gini)": {},
                "Relative poverty rate": {},
                "Financial resources": {
                    "Independence": {},
                    "Feeling of having enough": {}
                }
            },
            "Jobs and Employment": {
                "Employment rate": {},
                "Unemployment rate": {},
                "Job quality": {},
                "Job security": {},
                "Youth NEET rate": {},
                "Informal employment rate": {},
                "Work capacity": {}
            },
            "Safety": {
                "Personal security (homicide, assault)": {},
                "Perceived safety": {},
                "Road safety (traffic injuries)": {}
            }
        },
        "Civic Engagement and Governance": {
            "Political participation": {
                "Voter turnout": {},
                "Civic participation (consultation, petitions)": {}
            },
            "Governance quality": {
                "Trust in institutions": {},
                "Access to justice": {},
                "Perceived corruption": {}
            }
        },
        "Subjective Well-being": {
            "Life evaluation": {
                "Life satisfaction": {}
            },
            "Affect balance": {
                "Positive vs negative emotions": {}
            }
        },
        "Work-Life Balance": {
            "Working hours (long hours incidence)": {},
            "Leisure time": {},
            "Childcare availability": {},
            "Time use balance": {}
        },
        "Environment": {
            "Comfort and security": {},
            "Domestic environment": {
                "Crowding": {},
                "Available space": {},
                "Cleanliness": {},
                "Opportunities for privacy": {},
                "Available equipment": {},
                "Building construction quality": {}
            },
            "Health care and social care": {
                "Accessibility": {},
                "Quality": {}
            },
            "Opportunities to acquire new information and skills": {},
            "Participation in recreational and leisure activities": {},
            "Physical environment": {
                "Air pollution (PM2.5 exposure)": {},
                "Noise": {},
                "Traffic": {},
                "Climate": {},
                "Access to green space": {}
            },
            "Infrastructure": {
                "Transport": {},
                "Drinking water": {},
                "Gas": {},
                "Electricity": {},
                "Sewage networks": {}
            },
            "Urbanisation level": {}
        },
        "Social Relationships": {
            "Personal relations": {},
            "Social support (help in times of need)": {},
            "Social isolation / loneliness": {},
            "Sexual activity": {}
        },
        "Spirituality/Religion/Personal Beliefs": {}
    }
}

In [36]:
# === 1) LLM 结果缓存工具（方案B：仅在本次实验中使用；实验前清空目录） ===
LLM_CACHE_DIR = r"C:\Users\ADMrechbay20\PycharmProjects\metadata usage\metadata\without_metadata\llm_semantic"
os.makedirs(LLM_CACHE_DIR, exist_ok=True)

def _hash_obj(obj) -> str:
    s = json.dumps(obj, ensure_ascii=False, sort_keys=True, default=str)
    return hashlib.sha1(s.encode("utf-8")).hexdigest()[:16]

def _semantic_cache_path(model: str, theme_dict: dict, batch_cols: list, samples: dict, sample_rows: int) -> str:
    key = {
        "model": model,
        "theme_hash": _hash_obj(theme_dict),
        "cols": batch_cols,
        "samples": samples,          # 只包含前 sample_rows 的样本
        "sample_rows": sample_rows
    }
    h = _hash_obj(key)
    return os.path.join(LLM_CACHE_DIR, f"{h}.json")

def _semantic_cache_load(path: str):
    try:
        with open(path, "r", encoding="utf-8") as f:
            return json.load(f)
    except Exception:
        return None

def _semantic_cache_save(path: str, payload: dict):
    try:
        with open(path, "w", encoding="utf-8") as f:
            json.dump(payload, f, ensure_ascii=False, indent=2)
    except Exception:
        pass


In [37]:
# === 2) 语义识别（带缓存） ===
def semantic_helper(
    df: pd.DataFrame,
    wanted_theme_paths: List[str],
    model: str = "gpt-5-mini",
    sample_rows: int = 10,
    max_cols_per_batch: int = 30,
    use_cache: bool = True,
    force_refresh: bool = False
) -> pd.DataFrame:
    """
    使用 GPT 对列做语义识别（批处理），只保留：
      - 空间列
      - 时间列
      - 主题命中的 indicator / other information 列
    返回列：column_name, meaning, is_spatial, is_temporal, is_indicator, is_other_information, indicator_type, thematic_path
    """
    from typing import Optional
    all_results = []
    columns = list(df.columns)

    for i in range(0, len(columns), max_cols_per_batch):
        batch_cols = columns[i:i + max_cols_per_batch]
        batch_df = df[batch_cols].head(sample_rows)
        samples = batch_df.to_dict(orient="list")

        cache_path = _semantic_cache_path(model, THEME_FOLDER_STRUCTURE, batch_cols, samples, sample_rows)
        result = None

        if use_cache and not force_refresh:
            cached = _semantic_cache_load(cache_path)
            if cached and "columns" in cached:
                result = cached

        if result is None:
            prompt = f"""
You are a data steward. Classify each column based on header names and sample values.

Definitions (mutually exclusive):
- Spatial: a field that encodes a location (e.g., latitude/longitude, X/Y, address, admin code).
- Temporal: a field that encodes time or time period (e.g., year, date, month, quarter).
- Indicator: a measured variable used for analysis/monitoring (e.g., counts, rates, scores, categories).
- Other information: only if it is NOT spatial, NOT temporal, and NOT an indicator.

Tasks:
1) Explain the meaning (one short sentence).
2) Set is_spatial (bool).
3) Set is_temporal (bool).
4) Set is_indicator (bool).
5) Set is_other_information (bool) = True only if the first three are all False.
6) If is_indicator is True, set indicator_type to "Quantitative" or "Qualitative"; otherwise null.
7) If is_indicator is True, assign a theme from the given thematic hierarchy.
8) If is_other_information is True, try to assign a theme from the same hierarchy; if not possible, return null.

Thematic hierarchy (use exactly as provided):
{json.dumps(THEME_FOLDER_STRUCTURE, ensure_ascii=False)}

Column names and sample data (a few values each):
{json.dumps(samples, ensure_ascii=False)}

Hard rules (read carefully and follow exactly):
- The four classes are mutually exclusive: exactly one of [is_spatial, is_temporal, is_indicator, is_other_information] must be True.
- If is_indicator is False, indicator_type must be null.
- thematic_path must be a single path string joined by " > " with no trailing spaces (e.g., "Well-being > Health > Accessibility").
- thematic_path MUST start with "Well-being" (exact spelling and casing) and MUST be the full path from the root to the most specific applicable leaf.
- Do NOT invent, abbreviate, or reorder nodes. Use only nodes that exist in the provided hierarchy.
- If you cannot assign a valid full path that starts with "Well-being", return null for thematic_path.

Return JSON strictly as:
{{
  "columns": [
    {{
      "column_name": "str",
      "meaning": "str",
      "is_spatial": true/false,
      "is_temporal": true/false,
      "is_indicator": true/false,
      "is_other_information": true/false,
      "indicator_type": "Quantitative" | "Qualitative" | null,
      "thematic_path": "str" | null
    }}
  ]
}}
"""
            resp = client.chat.completions.create(
                model=model,
                messages=[{"role": "user", "content": prompt}],
                response_format={"type": "json_object"}
            )
            content = resp.choices[0].message.content
            result = json.loads(content)
            if use_cache:
                _semantic_cache_save(cache_path, result)

        batch_res = pd.DataFrame(result["columns"])
        all_results.append(batch_res)

    out_df = pd.concat(all_results, ignore_index=True)

    # 仅保留：空间/时间列，或（主题命中）indicator/other information 列
    def _theme_hit(path: Optional[str]) -> bool:
        if not isinstance(path, str) or not path.strip():
            return False
        # 复用你的主题匹配函数
        return bool(match_theme_cata([path], wanted_theme_paths))

    mask_keep = (
        out_df["is_spatial"].fillna(False)
        | out_df["is_temporal"].fillna(False)
        | (
            (out_df["is_indicator"].fillna(False) | out_df["is_other_information"].fillna(False))
            & out_df["thematic_path"].map(_theme_hit)
        )
    )
    return out_df.loc[mask_keep].reset_index(drop=True)


In [38]:
# === 3) 空间层级自动识别 + 行筛选（复用你已有函数） ===
def _col_normed_values(s: pd.Series, max_unique: int = 5000) -> set:
    vals = s.dropna().astype(str).head(max_unique).tolist()
    return {norm(v) for v in vals if str(v).strip() != ""}

def detect_spatial_level_for_column(
    series: pd.Series,
    df_hier: pd.DataFrame,
    level_defs: Dict[str, List[str]],
    min_hit_ratio: float = 0.05
) -> Optional[str]:
    cand_vals = _col_normed_values(series)
    if not cand_vals:
        return None
    best_level, best_ratio = None, 0.0
    for level, (code_col, name_col) in level_defs.items():
        pool = set(df_hier[code_col].astype(str).map(norm)) | set(df_hier[name_col].astype(str).map(norm))
        inter = len(cand_vals & pool)
        ratio = inter / max(1, len(cand_vals))
        if ratio > best_ratio:
            best_level, best_ratio = level, ratio
    return best_level if best_ratio >= min_hit_ratio else None

def _temporal_mask_for_periods(col: pd.Series, periods: List[Tuple[date, date]]) -> pd.Series:
    mask = pd.Series(False, index=col.index)
    if pd.api.types.is_datetime64_any_dtype(col):
        d = col.dt.date
        colmask = pd.Series(False, index=col.index)
        for a, b in periods:
            colmask |= (d >= a) & (d <= b)
        return colmask

    def _cell_hits(v):
        if pd.isna(v):
            return False
        try:
            s, e = parse_bounds(v)
        except Exception:
            return False
        for a, b in periods:
            if not (e < a or b < s):
                return True
        return False

    return col.apply(_cell_hits)

def build_spatial_allowed_sets_for_cols(
    df: pd.DataFrame,
    spatial_cols: List[str],
    sr: dict,
    df_hier: pd.DataFrame,
    level_defs: Dict[str, List[str]]
) -> Dict[str, set]:
    allowed_by_col = {}
    req_level = resolve_required_level(sr)  # e.g., country(france)->region
    req_values = sr.get("spatial_scope") or []

    for c in spatial_cols:
        lvl = detect_spatial_level_for_column(df[c], df_hier, level_defs)
        if lvl is None:
            continue
        try:
            allowed_list = get_r_scopes(df_hier, req_level, req_values, lvl)
        except Exception:
            allowed_list = []
        allowed_set = {norm(x) for x in allowed_list}
        if allowed_set:
            allowed_by_col[c] = allowed_set
    return allowed_by_col

def _cell_hits_periods(v, periods: List[Tuple[date, date]]) -> bool:
    """单元格是否与任一 period 相交；支持标量/列表；依赖 parse_bounds。"""
    if isinstance(v, (list, tuple, set)):
        return any(_cell_hits_periods(x, periods) for x in v)
    if pd.isna(v):
        return False
    try:
        s, e = parse_bounds(v)  # 你已有：返回 (start_date, end_date)，end 为该粒度最后一天
    except Exception:
        return False
    return any(not (e < a or b < s) for (a, b) in periods)

def _temporal_mask_for_periods(col: pd.Series, periods: List[Tuple[date, date]]) -> pd.Series:
    """对一列生成“落入任一时间段”的布尔掩码。"""
    if not periods:
        return pd.Series(False, index=col.index)
    if pd.api.types.is_datetime64_any_dtype(col):
        d = col.dt.date
        mask = pd.Series(False, index=col.index)
        for a, b in periods:
            mask |= (d >= a) & (d <= b)
        return mask
    return col.apply(lambda v: _cell_hits_periods(v, periods))

def filter_rows_by_scopes(
    df: pd.DataFrame,
    sr: Dict[str, Any],
    spatial_cols: List[str],
    temporal_cols: List[str]
) -> pd.DataFrame:
    # ---- 时间区间（sr["temporal_scope"] 保证为 list[dict]）----
    ts_list = sr.get("temporal_scope") or []
    periods: List[Tuple[date, date]] = [ _parse_wanted(ts) for ts in ts_list if ts ]
    # 去重（保序）
    periods = list(dict.fromkeys(periods))

    # ---- 空间掩码 ----
    allowed_by_col = build_spatial_allowed_sets_for_cols(df, spatial_cols, sr, DF_SPATIAL_HIER, level_defs)
    if allowed_by_col:
        mask_sp = pd.Series(False, index=df.index)
        for c, allowed in allowed_by_col.items():
            colmask = df[c].astype(str).map(norm).isin(allowed)
            mask_sp |= colmask
    else:
        mask_sp = pd.Series(False, index=df.index)

    # ---- 时间掩码（多段 periods 任一命中即可）----
    if temporal_cols:
        mask_tp = pd.Series(False, index=df.index)
        for c in temporal_cols:
            mask_tp |= _temporal_mask_for_periods(df[c], periods)
    else:
        mask_tp = pd.Series(False, index=df.index)

    return df.loc[mask_sp & mask_tp]


In [39]:
# === 4) 主流程封装 ===
def llm_filter_dataset(
    df_raw: pd.DataFrame,
    sr: dict,
    model: str = "gpt-5-mini",
    sample_rows: int = 10,
    max_cols_per_batch: int = 30,
    use_cache: bool = True,
    force_refresh: bool = False
):
    """
    1) LLM 列语义识别（仅保留 空间/时间 + 主题命中的 indicator/other）
    2) 组装需要的列（主题列 + 空间 + 时间）
    3) 按 SR 空间/时间筛选行
    返回：classification_df, df_selected, info
    """
    wanted_theme_paths = sr.get("themes") or []

    cls_df = semantic_helper(
        df=df_raw,
        wanted_theme_paths=wanted_theme_paths,
        model=model,
        sample_rows=sample_rows,
        max_cols_per_batch=max_cols_per_batch,
        use_cache=use_cache,
        force_refresh=force_refresh
    )

    spatial_cols = cls_df.loc[cls_df["is_spatial"] == True, "column_name"].tolist()
    temporal_cols = cls_df.loc[cls_df["is_temporal"] == True, "column_name"].tolist()
    thematic_cols = cls_df.loc[
        (cls_df["thematic_path"].notna()) &
        ((cls_df["is_indicator"] == True) | (cls_df["is_other_information"] == True)),
        "column_name"
    ].tolist()

    # 输出列：主题列 + 空间 + 时间（去重保序）
    ordered_cols = []
    for group in (thematic_cols, spatial_cols, temporal_cols):
        for c in group:
            if c not in ordered_cols and c in df_raw.columns:
                ordered_cols.append(c)

    df_cols_need = df_raw[ordered_cols].copy()

    df_selected = filter_rows_by_scopes(
        df=df_cols_need,
        sr=sr,
        spatial_cols=spatial_cols,
        temporal_cols=temporal_cols
    )

    info = {
        "spatial_cols": spatial_cols,
        "temporal_cols": temporal_cols,
        "thematic_cols": thematic_cols
    }
    return cls_df, df_selected, info


In [40]:
# === 5) 实验启动前：清空缓存（方案B的关键步骤） ===
import shutil

if os.path.exists(LLM_CACHE_DIR):
    shutil.rmtree(LLM_CACHE_DIR)
os.makedirs(LLM_CACHE_DIR, exist_ok=True)

print(f"LLM cache reset at: {LLM_CACHE_DIR}")


LLM cache reset at: C:\Users\ADMrechbay20\PycharmProjects\metadata usage\metadata\without_metadata\llm_semantic


In [41]:
# === 7) Batch process all files in a folder ===
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import pandas as pd
import time, random, os, re, json
from typing import List, Dict, Any, Optional

# Input folder to process (modify to your path)
INPUT_DIR = Path(RAW_DATA_ZONE)

# Output folder: classification results and filtered data will be saved per file
OUTPUT_DIR = Path(r"C:\Users\ADMrechbay20\PycharmProjects\metadata usage\metadata\without_metadata")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Supported extensions (from your dict_EL)
SUPPORTED_EXTS = set(dict_EL.keys())

def _safe_stem(p: Path) -> str:
    """Make a safe stem string for output filenames"""
    stem = p.stem
    stem = re.sub(r"[^\w\-]+", "_", stem)
    return stem[:80]

def _find_all_files(root: Path, exts: set) -> list[Path]:
    found = []
    for ext in exts:
        found.extend(root.rglob(f"*{ext}"))
    uniq = sorted({f.resolve() for f in found})
    return uniq

def process_folder_concurrent(
    input_dir,
    sr: dict,
    model: str = "gpt-5-mini",
    sample_rows: int = 10,
    max_cols_per_batch: int = 30,
    use_cache: bool = True,
    force_refresh: bool = False,
    save_per_file_outputs: bool = True,
    max_workers: int = 4,         # 并发线程数（建议 3~6，根据CPU/带宽/LLM配额调）
    verbose: bool = True          # 是否打印进度
) -> pd.DataFrame:
    """
    Run llm_filter_dataset for all supported files in a folder using multithreading.
    Returns a summary DataFrame (one row per file).
    """

    # --- normalize inputs ---
    input_dir = Path(input_dir)
    files = _find_all_files(input_dir, SUPPORTED_EXTS)

    if not files:
        if verbose:
            print(f"No supported files found in {input_dir}. Supported extensions: {sorted(SUPPORTED_EXTS)}")
        return pd.DataFrame(columns=[
            "file", "status", "n_rows_raw", "n_cols_raw",
            "n_cols_classified", "spatial_cols", "temporal_cols",
            "thematic_cols", "n_rows_selected",
            "classification_csv", "selected_csv"
        ])

    if verbose:
        print(f"Found {len(files)} files. Starting threaded processing with max_workers={max_workers} ...")

    # --- worker for a single file ---
    def _process_one_file(fpath: Path) -> Dict[str, Any]:
        stem = _safe_stem(fpath)
        cls_csv = OUTPUT_DIR / f"{stem}__classification.csv"
        sel_csv = OUTPUT_DIR / f"{stem}__selected.csv"

        # 可选：简单重试（防网络抖动 / LLM 限流）
        max_attempts = 3
        backoff_base = 1.5

        for attempt in range(1, max_attempts + 1):
            try:
                # 1) Read raw data
                df_raw = get_df(str(fpath))
                if df_raw is None or not isinstance(df_raw, pd.DataFrame) or df_raw.empty:
                    if verbose:
                        print(f"[skip] {fpath.name}: cannot read or empty.")
                    return {
                        "file": str(fpath), "status": "read_fail_or_empty",
                        "n_rows_raw": 0, "n_cols_raw": 0,
                        "n_cols_classified": 0, "spatial_cols": "",
                        "temporal_cols": "", "thematic_cols": "",
                        "n_rows_selected": 0,
                        "classification_csv": "", "selected_csv": ""
                    }

                # 2) Run LLM-based selection
                cls_df, df_selected, info = llm_filter_dataset(
                    df_raw=df_raw,
                    sr=sr,
                    model=model,
                    sample_rows=sample_rows,
                    max_cols_per_batch=max_cols_per_batch,
                    use_cache=use_cache,
                    force_refresh=force_refresh
                )

                # 3) Save outputs if required
                if save_per_file_outputs:
                    try:
                        if not cls_df.empty:
                            cls_df.to_csv(cls_csv, index=False, encoding="utf-8-sig")
                        if not df_selected.empty:
                            df_selected.to_csv(sel_csv, index=False, encoding="utf-8-sig")
                    except Exception as e:
                        # 保存失败不影响本文件的处理状态（记录即可）
                        if verbose:
                            print(f"[warn] Save CSV failed for {fpath.name}: {e}")

                # 4) Summary row
                return {
                    "file": str(fpath),
                    "status": "ok",
                    "n_rows_raw": int(df_raw.shape[0]),
                    "n_cols_raw": int(df_raw.shape[1]),
                    "n_cols_classified": int(cls_df.shape[0]),
                    "spatial_cols": ", ".join(info.get("spatial_cols", [])),
                    "temporal_cols": ", ".join(info.get("temporal_cols", [])),
                    "thematic_cols": ", ".join(info.get("thematic_cols", [])),
                    "n_rows_selected": int(df_selected.shape[0]),
                    "classification_csv": str(cls_csv if cls_csv.exists() else ""),
                    "selected_csv": str(sel_csv if sel_csv.exists() else "")
                }

            except Exception as e:
                # 简单指数退避后重试
                if attempt < max_attempts:
                    sleep_s = (backoff_base ** (attempt - 1)) + random.uniform(0, 0.5)
                    if verbose:
                        print(f"[retry] {fpath.name} attempt {attempt}/{max_attempts} failed: {e}. Sleep {sleep_s:.1f}s...")
                    time.sleep(sleep_s)
                else:
                    if verbose:
                        print(f"[error] {fpath.name} failed after {max_attempts} attempts: {e}")
                    return {
                        "file": str(fpath), "status": f"error: {type(e).__name__}",
                        "n_rows_raw": None, "n_cols_raw": None,
                        "n_cols_classified": None, "spatial_cols": "",
                        "temporal_cols": "", "thematic_cols": "",
                        "n_rows_selected": None,
                        "classification_csv": "", "selected_csv": ""
                    }

    # --- submit tasks ---
    summary_rows: List[Dict[str, Any]] = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_file = {executor.submit(_process_one_file, f): f for f in files}
        for i, future in enumerate(as_completed(future_to_file), 1):
            f = future_to_file[future]
            try:
                row = future.result()
                summary_rows.append(row)
                if verbose:
                    print(f"[{i}/{len(files)}] done: {Path(row['file']).name} -> {row['status']}")
            except Exception as e:
                # 极端情况（future result 自身抛异常）
                summary_rows.append({
                    "file": str(f), "status": f"error: {type(e).__name__}",
                    "n_rows_raw": None, "n_cols_raw": None,
                    "n_cols_classified": None, "spatial_cols": "",
                    "temporal_cols": "", "thematic_cols": "",
                    "n_rows_selected": None,
                    "classification_csv": "", "selected_csv": ""
                })
                if verbose:
                    print(f"[fatal] {f.name} future failed: {e}")

    # --- save summary ---
    summary_df = pd.DataFrame(summary_rows)
    summary_csv = OUTPUT_DIR / "batch_summary.csv"
    try:
        summary_df.to_csv(summary_csv, index=False, encoding="utf-8-sig")
        if verbose:
            print(f"\nBatch finished (threaded). Summary saved at: {summary_csv}")
    except Exception as e:
        if verbose:
            print(f"\nWarning: failed to save summary table: {e}")

    return summary_df


In [42]:
# === 8) (Optional) Reset LLM cache before each experiment (Scheme B key step) ===
import shutil, os
import random
if os.path.exists(LLM_CACHE_DIR):
    shutil.rmtree(LLM_CACHE_DIR)
os.makedirs(LLM_CACHE_DIR, exist_ok=True)
print(f"LLM cache reset at: {LLM_CACHE_DIR}")


LLM cache reset at: C:\Users\ADMrechbay20\PycharmProjects\metadata usage\metadata\without_metadata\llm_semantic


In [43]:
# === 9) Run batch processing (example with SR1) ===

batch_summary = process_folder_concurrent(
    input_dir=INPUT_DIR,       # str 或 Path 都行
    sr=SR1,
    model="gpt-5-mini",
    sample_rows=10,
    max_cols_per_batch=30,
    use_cache=True,            # 方案B：本轮实验内可缓存
    force_refresh=False,
    save_per_file_outputs=False,
    max_workers=8,             # 按你的机器与API限额调整
    verbose=True
)
display(batch_summary.head(30))

Found 47 files. Starting threaded processing with max_workers=8 ...
[1/47] done: DS_DEVDUR_data.csv -> ok
[2/47] done: DS_BPE_EVOLUTION_2024_data.csv -> ok
[3/47] done: DS_BTS_SAL_EQTP_SEX_PCS_2023_data.csv -> ok
[4/47] done: DS_BTS_SAL_EQTP_SEX_AGE_2023_data.csv -> ok
[5/47] done: DD_EEC_ANNUEL_2024_data.csv -> ok
[6/47] done: DS_IPCH_M_data.csv -> ok
[retry] fr-en-deploiemement_tedi.parquet attempt 1/3 failed: Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.. Sleep 1.1s...
[7/47] done: DS_BPE_EDUCATION_2024_data.csv -> ok
[retry] fr-en-deploiemement_tedi.parquet attempt 2/3 failed: Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.. Sleep 1.7s...
[error] fr-en-deploiemement_tedi.parquet failed after 3 attempts: Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
[8/47] done: fr-en-deploie

Unnamed: 0,file,status,n_rows_raw,n_cols_raw,n_cols_classified,spatial_cols,temporal_cols,thematic_cols,n_rows_selected,classification_csv,selected_csv
0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,9076.0,15.0,3.0,"GEO, GEO_OBJECT",TIME_PERIOD,,589.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,C:\Users\ADMrechbay20\PycharmProjects\metadata...
1,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,1175545.0,6.0,2.0,GEO,TIME_PERIOD,,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
2,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,370710.0,9.0,3.0,"GEO, GEO_OBJECT",TIME_PERIOD,,10125.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,C:\Users\ADMrechbay20\PycharmProjects\metadata...
3,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,444852.0,9.0,3.0,"GEO, GEO_OBJECT",TIME_PERIOD,,450.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,C:\Users\ADMrechbay20\PycharmProjects\metadata...
4,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,16524.0,17.0,1.0,,TIME_PERIOD,,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
5,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,139024.0,10.0,1.0,,TIME_PERIOD,,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
6,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,1611734.0,14.0,3.0,"GEO, GEO_OBJECT",TIME_PERIOD,,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
7,C:\Users\ADMrechbay20\PycharmProjects\metadata...,error: ImportError,,,,,,,,,
8,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,1351761.0,19.0,3.0,"GEO, GEO_OBJECT",TIME_PERIOD,,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
9,C:\Users\ADMrechbay20\PycharmProjects\metadata...,error: TypeError,,,,,,,,,


In [44]:
if os.path.exists(LLM_CACHE_DIR):
    shutil.rmtree(LLM_CACHE_DIR)
os.makedirs(LLM_CACHE_DIR, exist_ok=True)
print(f"LLM cache reset at: {LLM_CACHE_DIR}")

# === 9) Run batch processing (example with SR1) ===

batch_summary = process_folder_concurrent(
    input_dir=INPUT_DIR,  # str 或 Path 都行
    sr=SR2,
    model="gpt-5-mini",
    sample_rows=10,
    max_cols_per_batch=30,
    use_cache=True,  # 方案B：本轮实验内可缓存
    force_refresh=False,
    save_per_file_outputs=False,
    max_workers=8,  # 按你的机器与API限额调整
    verbose=True
)
display(batch_summary.head(30))

LLM cache reset at: C:\Users\ADMrechbay20\PycharmProjects\metadata usage\metadata\without_metadata\llm_semantic
Found 47 files. Starting threaded processing with max_workers=8 ...
[1/47] done: DD_EEC_ANNUEL_2024_data.csv -> ok
[2/47] done: DS_BTS_SAL_EQTP_SEX_PCS_2023_data.csv -> ok
[3/47] done: DS_BTS_SAL_EQTP_SEX_AGE_2023_data.csv -> ok
[4/47] done: DS_DEVDUR_data.csv -> ok
[5/47] done: DS_BPE_EVOLUTION_2024_data.csv -> ok
[6/47] done: DS_IPCH_M_data.csv -> ok
[retry] fr-en-deploiemement_tedi.parquet attempt 1/3 failed: Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.. Sleep 1.1s...
[retry] fr-en-deploiemement_tedi.parquet attempt 2/3 failed: Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.. Sleep 1.9s...
[error] fr-en-deploiemement_tedi.parquet failed after 3 attempts: Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. U

Unnamed: 0,file,status,n_rows_raw,n_cols_raw,n_cols_classified,spatial_cols,temporal_cols,thematic_cols,n_rows_selected,classification_csv,selected_csv
0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,16524.0,17.0,1.0,,TIME_PERIOD,,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
1,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,370710.0,9.0,3.0,"GEO, GEO_OBJECT",TIME_PERIOD,,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,C:\Users\ADMrechbay20\PycharmProjects\metadata...
2,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,444852.0,9.0,4.0,"GEO, GEO_OBJECT","FREQ, TIME_PERIOD",,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,C:\Users\ADMrechbay20\PycharmProjects\metadata...
3,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,9076.0,15.0,4.0,"GEO, GEO_OBJECT",TIME_PERIOD,OBS_VALUE,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,C:\Users\ADMrechbay20\PycharmProjects\metadata...
4,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,1175545.0,6.0,2.0,GEO,TIME_PERIOD,,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
5,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,139024.0,10.0,2.0,,"FREQ, TIME_PERIOD",,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
6,C:\Users\ADMrechbay20\PycharmProjects\metadata...,error: ImportError,,,,,,,,,
7,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,1611734.0,14.0,3.0,"GEO, GEO_OBJECT",TIME_PERIOD,,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
8,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,1351761.0,19.0,3.0,"GEO, GEO_OBJECT",TIME_PERIOD,,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
9,C:\Users\ADMrechbay20\PycharmProjects\metadata...,error: TypeError,,,,,,,,,


In [45]:
if os.path.exists(LLM_CACHE_DIR):
    shutil.rmtree(LLM_CACHE_DIR)
os.makedirs(LLM_CACHE_DIR, exist_ok=True)
print(f"LLM cache reset at: {LLM_CACHE_DIR}")

# === 9) Run batch processing (example with SR1) ===

batch_summary = process_folder_concurrent(
    input_dir=INPUT_DIR,  # str 或 Path 都行
    sr=SR3,
    model="gpt-5-mini",
    sample_rows=10,
    max_cols_per_batch=30,
    use_cache=True,  # 方案B：本轮实验内可缓存
    force_refresh=False,
    save_per_file_outputs=False,
    max_workers=8,  # 按你的机器与API限额调整
    verbose=True
)
display(batch_summary.head(30))

LLM cache reset at: C:\Users\ADMrechbay20\PycharmProjects\metadata usage\metadata\without_metadata\llm_semantic
Found 47 files. Starting threaded processing with max_workers=8 ...
[1/47] done: DD_EEC_ANNUEL_2024_data.csv -> ok
[2/47] done: DS_BTS_SAL_EQTP_SEX_PCS_2023_data.csv -> ok
[3/47] done: DS_DEVDUR_data.csv -> ok
[4/47] done: DS_BTS_SAL_EQTP_SEX_AGE_2023_data.csv -> ok
[5/47] done: DS_IPCH_M_data.csv -> ok
[6/47] done: DS_BPE_EVOLUTION_2024_data.csv -> ok
[retry] fr-en-deploiemement_tedi.parquet attempt 1/3 failed: Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.. Sleep 1.4s...
[retry] fr-en-deploiemement_tedi.parquet attempt 2/3 failed: Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.. Sleep 1.7s...
[error] fr-en-deploiemement_tedi.parquet failed after 3 attempts: Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. U

Unnamed: 0,file,status,n_rows_raw,n_cols_raw,n_cols_classified,spatial_cols,temporal_cols,thematic_cols,n_rows_selected,classification_csv,selected_csv
0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,16524.0,17.0,1.0,,TIME_PERIOD,,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
1,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,370710.0,9.0,4.0,"GEO, GEO_OBJECT",TIME_PERIOD,OBS_VALUE,1260.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,C:\Users\ADMrechbay20\PycharmProjects\metadata...
2,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,9076.0,15.0,3.0,"GEO, GEO_OBJECT",TIME_PERIOD,,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,C:\Users\ADMrechbay20\PycharmProjects\metadata...
3,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,444852.0,9.0,6.0,"GEO, GEO_OBJECT","FREQ, TIME_PERIOD","DERA_MEASURE, OBS_VALUE",72.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,C:\Users\ADMrechbay20\PycharmProjects\metadata...
4,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,139024.0,10.0,2.0,,TIME_PERIOD,OBS_VALUE,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
5,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,1175545.0,6.0,3.0,"GEO, GEO_OBJECT",TIME_PERIOD,,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
6,C:\Users\ADMrechbay20\PycharmProjects\metadata...,error: ImportError,,,,,,,,,
7,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,1611734.0,14.0,3.0,"GEO, GEO_OBJECT",TIME_PERIOD,,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
8,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,1351761.0,19.0,3.0,"GEO, GEO_OBJECT",TIME_PERIOD,,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
9,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,4725.0,93.0,0.0,,,,0.0,,


In [46]:
if os.path.exists(LLM_CACHE_DIR):
    shutil.rmtree(LLM_CACHE_DIR)
os.makedirs(LLM_CACHE_DIR, exist_ok=True)
print(f"LLM cache reset at: {LLM_CACHE_DIR}")

# === 9) Run batch processing (example with SR1) ===

batch_summary = process_folder_concurrent(
    input_dir=INPUT_DIR,  # str 或 Path 都行
    sr=CR1,
    model="gpt-5-mini",
    sample_rows=10,
    max_cols_per_batch=30,
    use_cache=True,  # 方案B：本轮实验内可缓存
    force_refresh=False,
    save_per_file_outputs=False,
    max_workers=8,  # 按你的机器与API限额调整
    verbose=True
)
display(batch_summary.head(30))

LLM cache reset at: C:\Users\ADMrechbay20\PycharmProjects\metadata usage\metadata\without_metadata\llm_semantic
Found 47 files. Starting threaded processing with max_workers=8 ...
[1/47] done: DS_BTS_SAL_EQTP_SEX_PCS_2023_data.csv -> ok
[2/47] done: DS_BTS_SAL_EQTP_SEX_AGE_2023_data.csv -> ok
[3/47] done: DD_EEC_ANNUEL_2024_data.csv -> ok
[4/47] done: DS_BPE_EVOLUTION_2024_data.csv -> ok
[5/47] done: DS_DEVDUR_data.csv -> ok
[6/47] done: DS_IPCH_M_data.csv -> ok
[retry] fr-en-deploiemement_tedi.parquet attempt 1/3 failed: Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.. Sleep 1.5s...
[retry] fr-en-deploiemement_tedi.parquet attempt 2/3 failed: Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.. Sleep 1.7s...
[error] fr-en-deploiemement_tedi.parquet failed after 3 attempts: Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. U

Unnamed: 0,file,status,n_rows_raw,n_cols_raw,n_cols_classified,spatial_cols,temporal_cols,thematic_cols,n_rows_selected,classification_csv,selected_csv
0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,370710.0,9.0,4.0,"GEO, GEO_OBJECT",TIME_PERIOD,OBS_VALUE,20250.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,C:\Users\ADMrechbay20\PycharmProjects\metadata...
1,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,444852.0,9.0,3.0,GEO,TIME_PERIOD,OBS_VALUE,900.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,C:\Users\ADMrechbay20\PycharmProjects\metadata...
2,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,16524.0,17.0,1.0,,TIME_PERIOD,,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
3,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,1175545.0,6.0,3.0,"GEO, GEO_OBJECT",TIME_PERIOD,,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
4,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,9076.0,15.0,4.0,"GEO, GEO_OBJECT",TIME_PERIOD,OBS_VALUE,3471.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,C:\Users\ADMrechbay20\PycharmProjects\metadata...
5,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,139024.0,10.0,2.0,,TIME_PERIOD,OBS_VALUE,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
6,C:\Users\ADMrechbay20\PycharmProjects\metadata...,error: ImportError,,,,,,,,,
7,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,1611734.0,14.0,3.0,"GEO, GEO_OBJECT",TIME_PERIOD,,64818.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
8,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,1351761.0,19.0,19.0,"GEO, GEO_OBJECT",TIME_PERIOD,"FACILITY_DOM, FACILITY_SDOM, FACILITY_TYPE, IN...",66992.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
9,C:\Users\ADMrechbay20\PycharmProjects\metadata...,error: TypeError,,,,,,,,,


In [47]:
if os.path.exists(LLM_CACHE_DIR):
    shutil.rmtree(LLM_CACHE_DIR)
os.makedirs(LLM_CACHE_DIR, exist_ok=True)
print(f"LLM cache reset at: {LLM_CACHE_DIR}")

# === 9) Run batch processing (example with SR1) ===

batch_summary = process_folder_concurrent(
    input_dir=INPUT_DIR,  # str 或 Path 都行
    sr=CR2,
    model="gpt-5-mini",
    sample_rows=10,
    max_cols_per_batch=30,
    use_cache=True,  # 方案B：本轮实验内可缓存
    force_refresh=False,
    save_per_file_outputs=False,
    max_workers=8,  # 按你的机器与API限额调整
    verbose=True
)
display(batch_summary.head(30))

LLM cache reset at: C:\Users\ADMrechbay20\PycharmProjects\metadata usage\metadata\without_metadata\llm_semantic
Found 47 files. Starting threaded processing with max_workers=8 ...
[1/47] done: DS_BTS_SAL_EQTP_SEX_AGE_2023_data.csv -> ok
[2/47] done: DS_DEVDUR_data.csv -> ok
[3/47] done: DD_EEC_ANNUEL_2024_data.csv -> ok
[4/47] done: DS_BTS_SAL_EQTP_SEX_PCS_2023_data.csv -> ok
[5/47] done: DS_BPE_EVOLUTION_2024_data.csv -> ok
[6/47] done: DS_IPCH_M_data.csv -> ok
[retry] fr-en-deploiemement_tedi.parquet attempt 1/3 failed: Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.. Sleep 1.4s...
[retry] fr-en-deploiemement_tedi.parquet attempt 2/3 failed: Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.. Sleep 1.5s...
[error] fr-en-deploiemement_tedi.parquet failed after 3 attempts: Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. U

Unnamed: 0,file,status,n_rows_raw,n_cols_raw,n_cols_classified,spatial_cols,temporal_cols,thematic_cols,n_rows_selected,classification_csv,selected_csv
0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,444852.0,9.0,3.0,"GEO, GEO_OBJECT",TIME_PERIOD,,576.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,C:\Users\ADMrechbay20\PycharmProjects\metadata...
1,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,9076.0,15.0,3.0,"GEO, GEO_OBJECT",TIME_PERIOD,,1766.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,C:\Users\ADMrechbay20\PycharmProjects\metadata...
2,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,16524.0,17.0,1.0,,TIME_PERIOD,,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
3,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,370710.0,9.0,3.0,"GEO, GEO_OBJECT",TIME_PERIOD,,9150.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,C:\Users\ADMrechbay20\PycharmProjects\metadata...
4,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,1175545.0,6.0,6.0,"GEO, GEO_OBJECT",TIME_PERIOD,"FACILITY_TYPE, BPE_MEASURE, OBS_VALUE",0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
5,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,139024.0,10.0,2.0,,"FREQ, TIME_PERIOD",,0.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
6,C:\Users\ADMrechbay20\PycharmProjects\metadata...,error: ImportError,,,,,,,,,
7,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,1611734.0,14.0,3.0,"GEO, GEO_OBJECT",TIME_PERIOD,,28482.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
8,C:\Users\ADMrechbay20\PycharmProjects\metadata...,ok,1351761.0,19.0,3.0,"GEO, GEO_OBJECT",TIME_PERIOD,,50602.0,C:\Users\ADMrechbay20\PycharmProjects\metadata...,
9,C:\Users\ADMrechbay20\PycharmProjects\metadata...,error: TypeError,,,,,,,,,
