### Base config

In [None]:
from __future__ import annotations
import json

from notebooks.shared.common.config import configure_logger, load_settings, load_config
from notebooks.shared.common.utils import (
    set_global_seed,
    get_utc_now,
    normalize_location_weights,
    derive_city_mappings,
)


# 1) Logging setup
logger = configure_logger(name="dataset_generator", level="INFO")


# 2) Load central configuration (typed -> fallback dict)
try:
    SETTINGS = load_settings("./dataset_config.yaml")     # preferred (typed)
    CONFIG = SETTINGS.to_dict() if hasattr(SETTINGS, "to_dict") else dict(SETTINGS)
except Exception:
    CONFIG = load_config("./dataset_config.yaml")         # fallback (dict)

# Recupera blocco "generation" o fallback flat (retrocompatibilità)
GEN_CFG = CONFIG.get("generation", CONFIG) or {}

# 3) Seeding deterministico
SEED = int(GEN_CFG.get("seed", CONFIG.get("seed", 42)))
rng = set_global_seed(SEED)


# 4) Reference time
REFERENCE_TIME = get_utc_now()


# 5) Location weights — normalizzazione e validazione
location_weights_cfg = GEN_CFG.get("location_weights", {}) or {}
if not location_weights_cfg:
    logger.error("location_weights mancante nella config.")
    raise ValueError("location_weights non definiti")

normalized_location_weights = normalize_location_weights(location_weights_cfg)


# 6) City mappings
LOCATIONS, URBAN_TYPE_BY_CITY, REGION_BY_CITY = derive_city_mappings(
    GEN_CFG,
    urban_override=GEN_CFG.get("urban_type_by_city"),
    region_override=GEN_CFG.get("region_by_city"),
)


# 7) Altri derivati da config
CITY_BASE_PRICES = GEN_CFG.get("city_base_prices", {}) or {}
SEASONALITY = GEN_CFG.get("seasonality", {}) or {}
ZONE_THRESHOLDS = GEN_CFG.get("zone_thresholds_km", {"center": 1.5, "semi_center": 5.0}) or {}

# Validazione thresholds (best-effort, extra rispetto ai modelli)
if not {"center", "semi_center"} <= set(ZONE_THRESHOLDS.keys()):
    raise ValueError("ZONE_THRESHOLDS incomplete: servono 'center' e 'semi_center'")
if float(ZONE_THRESHOLDS["center"]) >= float(ZONE_THRESHOLDS["semi_center"]):
    raise ValueError("'center' threshold deve essere < 'semi_center'")


# 8) Flatten config per retrocompatibilità
FLAT_CONFIG = {**CONFIG, **GEN_CFG}
FLAT_CONFIG["location_weights"] = dict(location_weights_cfg)


# 9) Log structured config summary
logger.info("=" * 60)
logger.info("DATASET GENERATION CONFIGURATION")
logger.info("=" * 60)
logger.info(
    "Config summary:\n%s",
    json.dumps(
        {
            "seed": SEED,
            "reference_time": REFERENCE_TIME.isoformat(),
            "locations_count": len(LOCATIONS),
            "rows_to_generate": int(FLAT_CONFIG.get("n_rows", GEN_CFG.get("n_rows", 0)) or 0),
            "asset_type": str(FLAT_CONFIG.get("asset_type", GEN_CFG.get("asset_type", "property"))),
            "cities_with_base_prices": len(CITY_BASE_PRICES),
        },
        indent=2,
        ensure_ascii=False,
    ),
)

### Dataset generation and base enrichment

In [None]:
from __future__ import annotations

import json
from pathlib import Path
from datetime import datetime, timezone

import numpy as np
import pandas as pd

from notebooks.shared.n01_generate_dataset.dataset_builder import generate_dataset_df
from notebooks.shared.common.sanity_checks import validate_dataset
from notebooks.shared.common.schema import get_required_fields
from notebooks.shared.common.utils import (
    NumpyJSONEncoder,
    optimize_dtypes,
    log_basic_diagnostics,
)

# 0) Sanity preliminare
GEN_CFG = CONFIG.get("generation", CONFIG)
assert isinstance(GEN_CFG, dict) and int(GEN_CFG.get("n_rows", 0)) > 0, "generation.n_rows mancante o non valido"

# 1) Generazione dataset + quality_report
logger.info("Starting dataset generation...")
df, quality_report = generate_dataset_df(
    config=GEN_CFG,                 # usa il blocco generation, NON CONFIG
    locations=LOCATIONS,
    urban_map=URBAN_TYPE_BY_CITY,
    region_map=REGION_BY_CITY,
    seasonality=SEASONALITY,
    city_base_prices=CITY_BASE_PRICES,
    rng=rng,
    reference_time=REFERENCE_TIME,
    batch_size=1000,
    show_progress=True,
    validate_each=True,
    error_budget_pct=0.01,
)
logger.info(f"✅ Generated {len(df):,} records")

# Output dir
Path("outputs").mkdir(parents=True, exist_ok=True)

# Salva quality_report per analisi in Notebook 02
with open("outputs/quality_report.json", "w", encoding="utf-8") as f:
    json.dump(quality_report, f, cls=NumpyJSONEncoder, indent=2, ensure_ascii=False)
logger.info("✅ Quality report saved to outputs/quality_report.json")

# 2) (Intenzionalmente niente enforce dei domini categoriali in nb01)

# 3) Ottimizzazione tipi + log memoria risparmiata
mem_before = df.memory_usage(deep=True).sum()
mem_before_cols = df.memory_usage(deep=True, index=False)
dtypes_before = df.dtypes.copy()

df = optimize_dtypes(df)

mem_after = df.memory_usage(deep=True).sum()
mem_after_cols = df.memory_usage(deep=True, index=False)
dtypes_after = df.dtypes

saved_bytes = mem_before - mem_after
saved_mb = saved_bytes / 1024**2
pct_saved = (saved_bytes / mem_before * 100) if mem_before > 0 else 0.0

logger.info(
    "✅ Data types optimized: %.2f MB → %.2f MB  (−%.2f MB, %.1f%%)",
    mem_before / 1024**2, mem_after / 1024**2, saved_mb, pct_saved
)

# (Opzionale) Top colonne per risparmio memoria
SHOW_TOP_SAVINGS = True
TOP_N = 8
if SHOW_TOP_SAVINGS:
    diff = (mem_before_cols - mem_after_cols).sort_values(ascending=False)
    top = {k: round(v / 1024**2, 3) for k, v in diff.head(TOP_N).items() if v > 0}
    if top:
        logger.info("🏁 Top risparmio per colonna (MB): %s", top)

# (Opzionale) Log dei dtype cambiati
changed = [c for c in df.columns if dtypes_before.get(c) is not None and dtypes_before[c] != dtypes_after[c]]
if changed:
    preview = {c: f"{dtypes_before[c]}→{dtypes_after[c]}" for c in changed[:TOP_N]}
    more = f" (+{len(changed)-TOP_N} altre)" if len(changed) > TOP_N else ""
    logger.info("🔤 Dtype cambiati (%d): %s%s", len(changed), preview, more)

# 4) Validazione schema & robust JSON dump
try:
    validation_report = validate_dataset(
        df,
        asset_type=GEN_CFG.get("asset_type", "property"),  # usa GEN_CFG
        raise_on_failure=True,
    )
    logger.info("✅ Dataset validation passed")
except RuntimeError as e:
    logger.error(f"❌ Validation failed: {e}")
    validation_report = {
        "overall_passed": False,
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "n_rows": len(df),
        "n_cols": df.shape[1],
        "error": str(e),
    }

print("\n" + "="*60)
print("VALIDATION SUMMARY")
print("="*60)
if "schema" in validation_report:
    missing = validation_report["schema"].get("missing", [])
    if missing:
        print(f"⚠️ Missing fields: {missing}")
    else:
        print("✅ All required fields present")
print(f"📊 Rows: {validation_report.get('n_rows', len(df)):,}")
print(f"📊 Cols: {validation_report.get('n_cols', df.shape[1])}")
print(f"✅ Validation: {'PASSED' if validation_report.get('overall_passed') else 'FAILED'}")

report_path = Path("outputs/validation_report.json")
report_path.parent.mkdir(parents=True, exist_ok=True)
try:
    with open(report_path, "w") as fp:
        json.dump(validation_report, fp, indent=2, cls=NumpyJSONEncoder)
    logger.info(f"✅ Validation report saved to {report_path}")
except (TypeError, ValueError) as e:
    logger.warning(f"JSON encoder issue ({e}), trying without custom encoder)")

    def convert_to_serializable(obj):
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        elif isinstance(obj, (np.integer, np.floating)):
            return float(obj)
        elif isinstance(obj, pd.Timestamp):
            return obj.isoformat()
        elif isinstance(obj, dict):
            return {k: convert_to_serializable(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [convert_to_serializable(item) for item in obj]
        return obj

    clean_report = convert_to_serializable(validation_report)
    with open(report_path, "w") as fp:
        json.dump(clean_report, fp, indent=2)
    logger.info(f"✅ Validation report (fallback) saved to {report_path}")

df.attrs["validation_timestamp"] = validation_report.get("timestamp")
df.attrs["validation_passed"] = bool(validation_report.get("overall_passed", False))

# 5) Riordino colonne secondo schema
required = get_required_fields(GEN_CFG.get("asset_type", "property"))  # usa GEN_CFG
missing = [c for c in required if c not in df.columns]
if missing:
    raise ValueError(f"Campi richiesti mancanti: {missing}")

others = [c for c in df.columns if c not in required]
df = df[required + others]
logger.info("✅ Columns reordered: %d required + %d optional", len(required), len(others))

# 6) Diagnostics rapidi
log_basic_diagnostics(df, logger)

print("\n" + "="*60)
print("DATASET GENERATION COMPLETED")
print("="*60)
print(f"✅ Records: {len(df):,}")
print(f"✅ Features: {df.shape[1]}")
print(f"✅ Memory: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print(f"✅ Validation: {'PASSED' if df.attrs.get('validation_passed') else 'FAILED'}")

### Profiling & optimizations

In [None]:
from __future__ import annotations

import json
from pathlib import Path

import pandas as pd

from notebooks.shared.common.performance_utils import DatasetProfiler, DtypeOptimizer
from notebooks.shared.common.reports import run_sanity_checks
from notebooks.shared.common.utils import NumpyJSONEncoder

# 1. Pre-profiling casts & parsing (best effort)
for col, dtype in {
    "price_per_sqm_capped": "float32",
    "listing_quarter": "category",
    "location_premium": "float32",
}.items():
    if col in df.columns:
        try:
            df[col] = df[col].astype(dtype)
        except Exception:
            pass  # best-effort

if "last_verified_ts" in df.columns:
    df["last_verified_ts"] = pd.to_datetime(df["last_verified_ts"], utc=True, errors="coerce")

# 2. Inizializza profiler (usa default se non configurato)
profiler = DatasetProfiler(
    float_downcast_atol=1e-6,
    float_downcast_rtol=1e-3,
)

# 3. Esegui profiling completo
print("🔍 Profiling dataset performance...")
profile_results = profiler.profile(
    df,
    groupby_observed=GEN_CFG.get("groupby_observed", True),
)

# 4. Report memoria
mem = profile_results.get("memory", {}) or {}
print("\n=== MEMORY PROFILE ===")
print(f"Total memory: {mem.get('total_mb', 0.0):.2f} MB")
print(f"Memory per row: {mem.get('per_row_kb', 0.0):.3f} KB")
print("\nMemory by dtype:")
for dtype, info in (mem.get("by_dtype", {}) or {}).items():
    print(f"  {dtype}: {info.get('mb', 0.0):.2f} MB ({info.get('pct', 0.0):.1f}%) - {info.get('n_columns', 0)} cols")

# 5. Benchmark performance
perf = profile_results.get("performance", {}) or {}
print("\n=== PERFORMANCE BENCHMARKS ===")
if "groupby" in perf and "time_ms" in perf["groupby"]:
    gb = perf["groupby"]
    cols = gb.get("columns") or []
    print(f"GroupBy ({', '.join(cols) if cols else '?' }): {gb['time_ms']:.2f} ms")
    if gb.get("rows_per_sec") is not None:
        print(f"  → {gb['rows_per_sec']:,} rows/sec")
if "sort" in perf and "time_ms" in perf["sort"]:
    srt = perf["sort"]
    print(f"Sort by {srt.get('column','?')}: {srt['time_ms']:.2f} ms")

# 6. Suggerimenti dtype
dtype_sugg = profile_results.get("dtype_optimization") or {}
print("\n=== DTYPE OPTIMIZATION SUGGESTIONS ===")
if dtype_sugg:
    for col, opt in list(dtype_sugg.items())[:10]:
        print(f"  {col}: {opt.get('current','?')} → {opt.get('target','?')} ({opt.get('reason','')})")
        if "memory_reduction_pct" in opt:
            print(f"    → Mem saving: {opt['memory_reduction_pct']}%")
else:
    print("  No optimizations suggested")

# 7. Suggerimenti indici
print("\n=== INDEX SUGGESTIONS ===")
for idx in profile_results.get("index_suggestions", []) or []:
    mark = "✅" if idx.get("utility") in {"excellent", "good"} else "❌"
    print(f"  {mark} {idx.get('column','?')}: {idx.get('utility','?')} ({idx.get('reason','')})")

# 8. Applica ottimizzazioni automatiche (facoltativo)
AUTO_APPLY_OPTIMIZATIONS = True
if AUTO_APPLY_OPTIMIZATIONS and dtype_sugg:
    print("\n🔧 Applying dtype optimizations...")
    optimizer = DtypeOptimizer()
    try:
        # API corretta: 'inplace'
        df, opt_report = optimizer.apply(
            df,
            dtype_sugg,
            inplace=False,  # <-- fix qui
        )
    except TypeError:
        # Fallback per versioni più vecchie (positional only)
        df, opt_report = optimizer.apply(df, dtype_sugg, False)

    summary = opt_report.get("summary", {}) or {}
    print(
        f"Applied: {summary.get('applied', 0)}, "
        f"Skipped: {summary.get('skipped', 0)}, "
        f"Failed: {summary.get('failed', 0)}"
    )

    # Delta memoria rispetto al profilo pre-ottimizzazione
    total_before = float(mem.get("total_mb", 0.0) or 0.0)
    new_mem_mb = df.memory_usage(deep=True).sum() / 1024**2
    saved = total_before - new_mem_mb
    if saved > 0:
        base = total_before if total_before > 0 else 1e-9
        print(f"✅ Memory saved: {saved:.2f} MB ({(saved / base) * 100:.1f}%)")

# 9. Salva profiling report
SAVE_PROFILING_REPORT = True
if SAVE_PROFILING_REPORT:
    log_dir = GEN_CFG.get("paths", {}).get("log_dir", "./logs")
    report_path = Path(log_dir) / "profiling_report.json"
    report_path.parent.mkdir(parents=True, exist_ok=True)
    with open(report_path, "w", encoding="utf-8") as f:
        json.dump(profile_results, f, indent=2, cls=NumpyJSONEncoder)
    logger.info("Profiling report saved to: %s", report_path)

# =========================
# SANITY BENCHMARKS & DRIFT
# =========================
sanity_report, df = run_sanity_checks(df, GEN_CFG)

logs_dir = Path("./outputs")
logs_dir.mkdir(parents=True, exist_ok=True)
sanity_path = logs_dir / "sanity_report.json"

with sanity_path.open("w", encoding="utf-8") as fp:
    json.dump(sanity_report, fp, indent=2, ensure_ascii=False, cls=NumpyJSONEncoder)

if not sanity_report.get("all_passed", True):
    raise RuntimeError(f"Sanity checks failed – vedi dettagli in {sanity_path}")

logger.info("✅ Sanity checks passed; report salvato in %s", sanity_path)

### Export

In [None]:
from __future__ import annotations

import json
from pathlib import Path

from notebooks.shared.n01_generate_dataset.exporter import export_dataset
from notebooks.shared.common.utils import NumpyJSONEncoder

# === Setup percorsi desiderati ===
OUT_DIR = Path(GEN_CFG.get("paths", {}).get("out_dir", "./outputs"))
OUT_DIR.mkdir(parents=True, exist_ok=True)

FILE_FORMAT = str(GEN_CFG.get("export_format", "csv")).lower()
assert FILE_FORMAT in {"csv", "parquet"}, f"export_format non supportato: {FILE_FORMAT}"
COMPRESSION = GEN_CFG.get("compression", None if FILE_FORMAT == "csv" else "snappy")
NO_OVERWRITE = bool(GEN_CFG.get("no_overwrite", False))

ext = "parquet" if FILE_FORMAT.lower() == "parquet" else "csv"
output_path = OUT_DIR / f"{FILENAME_PREFIX}.{ext}"
snapshot_dir = OUT_DIR / "snapshots"
log_dir = OUT_DIR / "logs"

# Costruisci config da passare all'exporter
EXPORT_CFG = {
    **GEN_CFG,
    "paths": {
        **GEN_CFG.get("paths", {}),
        "output_path": str(output_path),
        "snapshot_dir": str(snapshot_dir),
        "log_dir": str(log_dir),
    },
}

# === Export dataset ===
manifest = export_dataset(
    df=df,
    config=EXPORT_CFG,
    report=clean_report,
    logger=logger,
    format=FILE_FORMAT,
    compression=COMPRESSION,
    index=False,
    no_overwrite=NO_OVERWRITE,
)

try:
    ds_path = Path(manifest.get("dataset_path", output_path))
    size_mb = ds_path.stat().st_size / 1024**2
    logger.info("Artefatto scritto: %s (%.2f MB) – rows=%s, cols=%s",
                ds_path, size_mb, f"{len(df):,}", df.shape[1])
except Exception:
    pass

# Snapshot descrittivo (opzionale)
df.describe(include="all").T.head(20).to_csv(OUT_DIR / "describe_snapshot.csv", encoding="utf-8")

# ------ 3) Drift check ------
drift_info = (
    clean_report.get("sanity_benchmarks", {}).get("location_drift")
    or df.attrs.get("location_drift_report", {})
    or {}
)

with (OUT_DIR / "location_drift.json").open("w", encoding="utf-8") as fp:
    json.dump(drift_info, fp, cls=NumpyJSONEncoder, indent=2, ensure_ascii=False)

tolerance = (
    GEN_CFG.get("expected_profile", {}).get("location_distribution_tolerance", 0.05)
    if isinstance(GEN_CFG.get("expected_profile", {}), dict) else 0.05
)

violating = []
if isinstance(drift_info, dict):
    for loc, info in drift_info.items():
        try:
            diff = float(info.get("difference", 0.0))
            if abs(diff) > tolerance:
                violating.append(loc)
        except Exception:
            continue

if violating:
    raise ValueError(f"Location drift eccessivo su: {violating} (tolleranza ±{tolerance})")

# ------ 4) Log riassunto manifest ------
wanted = ["generated_at", "dataset_path", "quality_report_path", "manifest_path", "manifest_hash"]
manifest_summary = {k: manifest.get(k) for k in wanted if isinstance(manifest, dict) and k in manifest}
logger.info("Export completato con manifest:")
logger.info(json.dumps(manifest_summary or manifest, indent=2, ensure_ascii=False))