In [5]:
# /colab_notebooks/week2_step1_bootstrap_clean.ipynb
# Week 2 — Step 1: initialize week_2 structure and validate prerequisites (clean)

from __future__ import annotations

import os
import json
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Dict

# ----------------------------- Configuration ----------------------------- #

DRIVE_MOUNT_PT: str = "/content/drive"
PROJECT_ROOT: str   = f"{DRIVE_MOUNT_PT}/MyDrive/Colab Notebooks/New_cyber_project"

DATASET_ROOT: str = os.path.join(PROJECT_ROOT, "dataset")  # shared across weeks

W1_ROOT: str = os.path.join(PROJECT_ROOT, "week_1")
W1_OUT: str  = os.path.join(W1_ROOT, "outputs")

W2_ROOT: str = os.path.join(PROJECT_ROOT, "week_2")
W2_NB: str   = os.path.join(W2_ROOT, "notebooks")
W2_OUT: str  = os.path.join(W2_ROOT, "outputs")

REQUIRED_W1: Dict[str, str] = {
    "inventory": os.path.join(W1_OUT, "data_inventory_paths.csv"),
    "schema":    os.path.join(W1_OUT, "schema_preview.json"),
}
OPTIONAL_W1: Dict[str, str] = {
    "summary": os.path.join(W1_OUT, "summary.csv"),
    "errors":  os.path.join(W1_OUT, "schema_errors.csv"),
}


@dataclass(frozen=True)
class SessionMeta:
    project_root: str
    dataset_root: str
    week_1_outputs: Dict[str, str]
    week_2: Dict[str, str]
    created_at: str
    notes: str

def _ensure_drive_mounted() -> None:
    """Ensure Google Drive is mounted. Avoids interactive prompts if already mounted."""
    import google.colab  # type: ignore
    from google.colab import drive  # type: ignore
    if not os.path.ismount(DRIVE_MOUNT_PT):
        drive.mount(DRIVE_MOUNT_PT)


def _ensure_structure() -> None:
    """Create week_2 directories; avoid side effects elsewhere."""
    os.makedirs(W2_NB, exist_ok=True)
    os.makedirs(W2_OUT, exist_ok=True)


def _validate_prereqs() -> Dict[str, str]:
    """Validate Week 1 required artifacts and return resolved paths map."""
    missing = [k for k, p in REQUIRED_W1.items() if not os.path.isfile(p)]
    if missing:
        raise FileNotFoundError(
            "Week 2 prerequisites not found: " + ", ".join(missing)
        )
    resolved = {k: v for k, v in REQUIRED_W1.items()}
    resolved.update({k: v for k, v in OPTIONAL_W1.items() if os.path.isfile(v)})
    return resolved


def _write_session_meta(resolved_w1: Dict[str, str]) -> str:
    """Write session metadata JSON into week_2/outputs and return its path."""
    meta = SessionMeta(
        project_root=PROJECT_ROOT,
        dataset_root=DATASET_ROOT,
        week_1_outputs=resolved_w1,
        week_2={"root": W2_ROOT, "outputs": W2_OUT, "notebooks": W2_NB},
        created_at=datetime.utcnow().isoformat(timespec="seconds") + "Z",
        notes="Week 2 initialized.",
    )
    out_path = os.path.join(W2_OUT, "session_meta.json")
    with open(out_path, "w") as f:
        json.dump(asdict(meta), f, indent=2)
    return out_path


def main() -> None:
    """Entry point: mount, create week_2 structure, validate prereqs, persist meta."""
    _ensure_drive_mounted()
    _ensure_structure()
    resolved = _validate_prereqs()
    meta_path = _write_session_meta(resolved)
    print(f"Week 2 initialized.")
    print(f"week_2 root:    {W2_ROOT}")
    print(f"session meta:   {meta_path}")


if __name__ == "__main__":
    main()


Week 2 initialized.
week_2 root:    /content/drive/MyDrive/Colab Notebooks/New_cyber_project/week_2
session meta:   /content/drive/MyDrive/Colab Notebooks/New_cyber_project/week_2/outputs/session_meta.json


  created_at=datetime.utcnow().isoformat(timespec="seconds") + "Z",


In [13]:
# /colab_notebooks/week2_step2_quick_eda_clean.ipynb
# Week 2 — Step 2:modular quick EDA
from __future__ import annotations

import os
from dataclasses import dataclass
from typing import List, Optional, Dict

import pandas as pd


# ----------------------------- Configuration ----------------------------- #

DRIVE_MOUNT_PT: str = "/content/drive"
PROJECT_ROOT: str   = f"{DRIVE_MOUNT_PT}/MyDrive/Colab Notebooks/New_cyber_project"

W1_OUT: str = os.path.join(PROJECT_ROOT, "week_1", "outputs")
W2_OUT: str = os.path.join(PROJECT_ROOT, "week_2", "outputs")

INVENTORY_CSV: str   = os.path.join(W1_OUT, "data_inventory_paths.csv")
EDA_FILES_CSV: str   = os.path.join(W2_OUT, "eda_files.csv")
EDA_OVERVIEW_CSV: str= os.path.join(W2_OUT, "eda_overview.csv")

MAX_SAMPLE_ROWS: int = 10000


# ----------------------------- Data Models -------------------------------- #

@dataclass(frozen=True)
class FileEntry:
    domain: str
    path: str
    extension: str


# ------------------------------ Utilities --------------------------------- #

def ensure_inputs() -> None:
    """Validate required inputs and ensure output directory exists."""
    if not os.path.isfile(INVENTORY_CSV):
        raise FileNotFoundError(f"Required file not found: {INVENTORY_CSV}")
    os.makedirs(W2_OUT, exist_ok=True)


def read_inventory() -> List[FileEntry]:
    """Load Week 1 inventory and return file entries."""
    df = pd.read_csv(INVENTORY_CSV)
    if df.empty:
        raise ValueError("Inventory contains no entries.")
    return [
        FileEntry(
            domain=str(r["domain"]),
            path=str(r["path"]),
            extension=str(r["extension"]).lower(),
        )
        for _, r in df.iterrows()
    ]


def read_sample(path: str, extension: str, nrows: int) -> Optional[pd.DataFrame]:
    """Return a sampled DataFrame or None on controlled failure."""
    try:
        if extension == ".csv":
            return pd.read_csv(path, nrows=nrows, low_memory=True)
        if extension == ".parquet":
            return pd.read_parquet(path).head(nrows)
        return None
    except Exception:
        return None


def compute_file_metrics(df: Optional[pd.DataFrame]) -> Dict[str, Optional[float]]:
    """Compute sample_rows, ncols, and overall null % from a DataFrame."""
    if df is None or df.empty:
        return {"sample_rows": pd.NA, "ncols": pd.NA, "null_pct_overall": pd.NA}
    nrows = int(min(len(df), MAX_SAMPLE_ROWS))
    ncols = int(df.shape[1])
    if nrows == 0 or ncols == 0:
        return {"sample_rows": nrows, "ncols": ncols, "null_pct_overall": 100.0}
    null_pct = float((df.isna().sum().sum() / (nrows * ncols)) * 100.0)
    return {
        "sample_rows": nrows,
        "ncols": ncols,
        "null_pct_overall": round(null_pct, 4),
    }


def build_per_file(entries: List[FileEntry]) -> pd.DataFrame:
    """Compute per-file EDA metrics for all entries."""
    rows = []
    for e in entries:
        df = read_sample(e.path, e.extension, MAX_SAMPLE_ROWS)
        metrics = compute_file_metrics(df)
        rows.append(
            {
                "domain": e.domain,
                "path": e.path,
                "extension": e.extension,
                **metrics,
            }
        )
    return pd.DataFrame(rows)


def aggregate_overview(per_file: pd.DataFrame) -> pd.DataFrame:
    """Aggregate domain-level EDA metrics."""
    overview = (
        per_file.groupby("domain", dropna=False)
        .agg(
            files=("path", "count"),
            median_sample_rows=("sample_rows", "median"),
            avg_ncols=("ncols", "mean"),
            mean_null_pct=("null_pct_overall", "mean"),
        )
        .round({"avg_ncols": 2, "mean_null_pct": 4})
        .reset_index()
    )
    return overview


def main() -> None:
    """Entry point: produce EDA CSVs."""
    ensure_inputs()
    entries = read_inventory()
    per_file = build_per_file(entries)
    overview = aggregate_overview(per_file)

    per_file.to_csv(EDA_FILES_CSV, index=False)
    overview.to_csv(EDA_OVERVIEW_CSV, index=False)

    # Minimal confirmations
    print(f"Per-file EDA written: {EDA_FILES_CSV}")
    print(f"Overview EDA written: {EDA_OVERVIEW_CSV}")


if __name__ == "__main__":
    main()


  return pd.read_csv(path, nrows=nrows, low_memory=True)


Per-file EDA written: /content/drive/MyDrive/Colab Notebooks/New_cyber_project/week_2/outputs/eda_files.csv
Overview EDA written: /content/drive/MyDrive/Colab Notebooks/New_cyber_project/week_2/outputs/eda_overview.csv


  return pd.read_csv(path, nrows=nrows, low_memory=True)


In [14]:
# /colab_notebooks/week2_step3_column_profile.ipynb

from __future__ import annotations

import os
from collections import Counter, defaultdict
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional

import pandas as pd

DRIVE_MOUNT_PT = "/content/drive"
PROJECT_ROOT   = f"{DRIVE_MOUNT_PT}/MyDrive/Colab Notebooks/New_cyber_project"
W1_OUT         = os.path.join(PROJECT_ROOT, "week_1", "outputs")
W2_OUT         = os.path.join(PROJECT_ROOT, "week_2", "outputs")

INVENTORY_CSV       = os.path.join(W1_OUT, "data_inventory_paths.csv")
COLUMN_PROFILE_CSV  = os.path.join(W2_OUT, "column_profile.csv")
DTYPE_SUMMARY_CSV   = os.path.join(W2_OUT, "dtype_summary.csv")

MAX_SAMPLE_ROWS = 10_000  # as requested

@dataclass(frozen=True)
class FileEntry:
    domain: str
    path: str
    extension: str

@dataclass
class FileSchema:
    domain: str
    path: str
    columns: List[str]
    dtypes: Dict[str, str]

def ensure_inputs() -> None:
    if not os.path.isfile(INVENTORY_CSV):
        raise FileNotFoundError(f"Required file not found: {INVENTORY_CSV}")
    os.makedirs(W2_OUT, exist_ok=True)

def read_inventory() -> List[FileEntry]:
    df = pd.read_csv(INVENTORY_CSV)
    if df.empty:
        raise ValueError("Inventory contains no entries.")
    return [
        FileEntry(str(r["domain"]), str(r["path"]), str(r["extension"]).lower())
        for _, r in df.iterrows()
    ]

def read_sample(path: str, extension: str, nrows: int) -> Optional[pd.DataFrame]:
    try:
        if extension == ".csv":
            return pd.read_csv(path, nrows=nrows, low_memory=True)
        if extension == ".parquet":
            return pd.read_parquet(path).head(nrows)
        return None
    except Exception:
        return None

def infer_file_schema(e: FileEntry) -> Optional[FileSchema]:
    df = read_sample(e.path, e.extension, MAX_SAMPLE_ROWS)
    if df is None or df.empty:
        return None
    cols = list(df.columns)
    dtypes = {c: str(df[c].dtype) for c in cols}
    return FileSchema(e.domain, e.path, cols, dtypes)

def collect_schemas(entries: Iterable[FileEntry]) -> List[FileSchema]:
    out: List[FileSchema] = []
    for e in entries:
        fs = infer_file_schema(e)
        if fs is not None:
            out.append(fs)
    if not out:
        raise ValueError("No readable files were found during sampling.")
    return out

def build_column_profile(schemas: List[FileSchema]) -> pd.DataFrame:
    by_domain_files = defaultdict(set)
    by_domain_col_files = defaultdict(lambda: defaultdict(set))
    by_domain_col_dtype = defaultdict(lambda: defaultdict(Counter))

    for fs in schemas:
        by_domain_files[fs.domain].add(fs.path)
        for c in fs.columns:
            by_domain_col_files[fs.domain][c].add(fs.path)
            by_domain_col_dtype[fs.domain][c].update([fs.dtypes.get(c, "unknown")])

    rows: List[Dict[str, object]] = []
    for dom, files in by_domain_files.items():
        total = len(files)
        for col, file_set in by_domain_col_files[dom].items():
            modal_dtype = by_domain_col_dtype[dom][col].most_common(1)[0][0]
            rows.append({
                "domain": dom,
                "column": col,
                "files_with_column": len(file_set),
                "total_files_in_domain": total,
                "pct_coverage": round(len(file_set) / total * 100.0, 2) if total else 0.0,
                "modal_dtype": modal_dtype,
            })

    df = pd.DataFrame(rows)
    if df.empty:
        df = pd.DataFrame(columns=["domain","column","files_with_column","total_files_in_domain","pct_coverage","modal_dtype"])
    return df.sort_values(["domain","pct_coverage","column"], ascending=[True, False, True]).reset_index(drop=True)

def build_dtype_summary(schemas: List[FileSchema]) -> pd.DataFrame:
    by_domain = defaultdict(Counter)
    for fs in schemas:
        by_domain[fs.domain].update(fs.dtypes.values())
    rows = [{"domain": d, "dtype": t, "count": int(c)} for d, ctr in by_domain.items() for t, c in ctr.most_common()]
    df = pd.DataFrame(rows)
    if df.empty:
        df = pd.DataFrame(columns=["domain","dtype","count"])
    return df.sort_values(["domain","count"], ascending=[True, False]).reset_index(drop=True)

def main() -> None:
    ensure_inputs()
    entries = read_inventory()
    schemas = collect_schemas(entries)
    column_profile = build_column_profile(schemas)
    dtype_summary = build_dtype_summary(schemas)
    column_profile.to_csv(COLUMN_PROFILE_CSV, index=False)
    dtype_summary.to_csv(DTYPE_SUMMARY_CSV, index=False)
    print(f"column_profile: {COLUMN_PROFILE_CSV}")
    print(f"dtype_summary:  {DTYPE_SUMMARY_CSV}")

if __name__ == "__main__":
    main()


  return pd.read_csv(path, nrows=nrows, low_memory=True)


column_profile: /content/drive/MyDrive/Colab Notebooks/New_cyber_project/week_2/outputs/column_profile.csv
dtype_summary:  /content/drive/MyDrive/Colab Notebooks/New_cyber_project/week_2/outputs/dtype_summary.csv


  return pd.read_csv(path, nrows=nrows, low_memory=True)


In [15]:
# /colab_notebooks/week2_step4_target_availability.ipynb

from __future__ import annotations

import os
import json
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple

import pandas as pd

try:
    import pyarrow as pa
    import pyarrow.parquet as pq
except Exception:
    raise ImportError("pyarrow is required for Parquet counting. Install with: pip install pyarrow")

# --- Paths ---
DRIVE_MOUNT_PT = "/content/drive"
PROJECT_ROOT   = f"{DRIVE_MOUNT_PT}/MyDrive/Colab Notebooks/New_cyber_project"
W1_OUT         = os.path.join(PROJECT_ROOT, "week_1", "outputs")
W2_OUT         = os.path.join(PROJECT_ROOT, "week_2", "outputs")

INVENTORY_CSV  = os.path.join(W1_OUT, "data_inventory_paths.csv")
SCHEMA_JSON    = os.path.join(W1_OUT, "schema_preview.json")
OUT_CSV        = os.path.join(W2_OUT, "target_availability.csv")

os.makedirs(W2_OUT, exist_ok=True)

# --- Models ---
@dataclass(frozen=True)
class FileEntry:
    domain: str
    path: str
    extension: str

@dataclass
class TargetCols:
    label: Optional[str]
    type_: Optional[str]


# --- Helpers ---
def _ensure_inputs() -> None:
    if not os.path.isfile(INVENTORY_CSV):
        raise FileNotFoundError(f"Required file not found: {INVENTORY_CSV}")
    if not os.path.isfile(SCHEMA_JSON):
        raise FileNotFoundError(f"Required file not found: {SCHEMA_JSON}")

def _read_inventory() -> List[FileEntry]:
    df = pd.read_csv(INVENTORY_CSV)
    if df.empty:
        raise ValueError("Inventory contains no entries.")
    return [FileEntry(str(r.domain), str(r.path), str(r.extension).lower()) for _, r in df.iterrows()]

def _read_schema_map() -> Dict[str, List[str]]:
    data = json.load(open(SCHEMA_JSON))
    # Map file path -> original column names list
    m: Dict[str, List[str]] = {}
    for rec in data:
        if "path" in rec and "columns" in rec:
            m[str(rec["path"])] = [str(c) for c in rec["columns"]]
    return m

def _find_target_cols(cols: List[str]) -> TargetCols:
    low = {c.lower(): c for c in cols}
    label = low.get("label") or low.get("labels") or low.get("target") or low.get("class")
    type_ = low.get("type") or low.get("attack_cat")
    return TargetCols(label=label, type_=type_)

def _count_csv(path: str, target: TargetCols, chunksize: int = 100_000) -> Tuple[int, int, int]:
    usecols = [c for c in [target.label, target.type_] if c]
    total = lbl = typ = 0
    if usecols:
        for chunk in pd.read_csv(path, usecols=usecols, chunksize=chunksize, low_memory=True):
            total += len(chunk)
            if target.label and target.label in chunk:
                lbl += chunk[target.label].notna().sum()
            if target.type_ and target.type_ in chunk:
                typ += chunk[target.type_].notna().sum()
    else:
        for chunk in pd.read_csv(path, usecols=None, chunksize=chunksize, low_memory=True):
            total += len(chunk)
    return total, lbl, typ

def _count_parquet(path: str, target: TargetCols) -> Tuple[int, int, int]:
    pf = pq.ParquetFile(path)
    total = lbl = typ = 0
    want = [c for c in [target.label, target.type_] if c]
    # If neither target present, count total rows via metadata only.
    if not want:
        # Sum row counts from row groups without materializing
        return sum(pf.metadata.row_group(i).num_rows for i in range(pf.metadata.num_row_groups)), 0, 0
    for i in range(pf.metadata.num_row_groups):
        cols = want
        table = pf.read_row_group(i, columns=cols)
        total += table.num_rows
        for name in cols:
            arr = table.column(name)
            # Non-null = length - null_count
            non_null = arr.length() - arr.null_count
            if name == target.label:
                lbl += int(non_null)
            if name == target.type_:
                typ += int(non_null)
    return total, lbl, typ

def _count_file(e: FileEntry, tcols: TargetCols) -> Tuple[int, int, int]:
    if e.extension == ".csv":
        return _count_csv(e.path, tcols)
    if e.extension == ".parquet":
        return _count_parquet(e.path, tcols)
    return (0, 0, 0)


# --- Main ---
def main() -> None:
    _ensure_inputs()
    entries = _read_inventory()
    schema_map = _read_schema_map()

    rows = []
    for e in entries:
        cols = schema_map.get(e.path, [])
        tcols = _find_target_cols(cols)
        has_label = tcols.label is not None
        has_type  = tcols.type_ is not None

        try:
            total, lbl, typ = _count_file(e, tcols)
        except Exception:
            total, lbl, typ = (0, 0, 0)

        rows.append({
            "domain": e.domain,
            "path": e.path,
            "extension": e.extension,
            "has_label_col": has_label,
            "has_type_col": has_type,
            "total_rows": int(total),
            "label_nonnull_rows": int(lbl),
            "type_nonnull_rows": int(typ),
        })

    per_file = pd.DataFrame(rows)

    agg = (
        per_file.groupby("domain", dropna=False)
        .agg(
            total_files=("path", "count"),
            files_with_label_col=("has_label_col", "sum"),
            files_with_type_col=("has_type_col", "sum"),
            total_rows=("total_rows", "sum"),
            label_nonnull_rows=("label_nonnull_rows", "sum"),
            type_nonnull_rows=("type_nonnull_rows", "sum"),
        )
        .reset_index()
    )

    # Coverage percentages (row-level)
    agg["label_row_coverage_pct"] = (
        (agg["label_nonnull_rows"] / agg["total_rows"]).where(agg["total_rows"] > 0, 0.0) * 100
    ).round(2)
    agg["type_row_coverage_pct"] = (
        (agg["type_nonnull_rows"] / agg["total_rows"]).where(agg["total_rows"] > 0, 0.0) * 100
    ).round(2)

    agg.to_csv(OUT_CSV, index=False)
    print(f"target_availability: {OUT_CSV}")

if __name__ == "__main__":
    main()


target_availability: /content/drive/MyDrive/Colab Notebooks/New_cyber_project/week_2/outputs/target_availability.csv


In [16]:
# /colab_notebooks/week2_step5_validation_and_report.ipynb
# Week 2 — Step 5: validations + concise Markdown report

from __future__ import annotations

import os
import json
from dataclasses import dataclass, asdict
from typing import Dict, List

import pandas as pd

# ----------------------------- Paths ----------------------------- #
DRIVE_MOUNT_PT = "/content/drive"
PROJECT_ROOT   = f"{DRIVE_MOUNT_PT}/MyDrive/Colab Notebooks/New_cyber_project"
W2_OUT         = os.path.join(PROJECT_ROOT, "week_2", "outputs")

EDA_OVERVIEW_CSV = os.path.join(W2_OUT, "eda_overview.csv")
COLUMN_PROFILE_CSV = os.path.join(W2_OUT, "column_profile.csv")
DTYPE_SUMMARY_CSV = os.path.join(W2_OUT, "dtype_summary.csv")
TARGET_AVAIL_CSV = os.path.join(W2_OUT, "target_availability.csv")

VALIDATION_JSON = os.path.join(W2_OUT, "validation_report.json")
REPORT_MD       = os.path.join(W2_OUT, "week2_report.md")

os.makedirs(W2_OUT, exist_ok=True)

# --------------------------- Parameters --------------------------- #
NULL_OK_THRESHOLD = 95.0         # mean null % must be <= this
COVERAGE_COL_PCT  = 50.0         # "well-covered" column threshold
MIN_WELL_COVERED  = 5            # require at least this many columns per domain

# ----------------------------- Models ----------------------------- #
@dataclass
class DomainValidation:
    domain: str
    has_any_rows: bool
    has_any_targets: bool
    mean_null_pct_ok: bool
    well_covered_columns: int
    well_covered_ok: bool
    details: Dict[str, float | int]

@dataclass
class ValidationResult:
    overall_pass: bool
    per_domain: List[DomainValidation]

# --------------------------- Load Inputs -------------------------- #
for p in [EDA_OVERVIEW_CSV, COLUMN_PROFILE_CSV, DTYPE_SUMMARY_CSV, TARGET_AVAIL_CSV]:
    if not os.path.isfile(p):
        raise FileNotFoundError(f"Required file not found: {p}")

eda = pd.read_csv(EDA_OVERVIEW_CSV)               # domain, files, median_sample_rows, avg_ncols, mean_null_pct
colprof = pd.read_csv(COLUMN_PROFILE_CSV)         # domain, column, files_with_column, total_files_in_domain, pct_coverage, modal_dtype
dtype_sum = pd.read_csv(DTYPE_SUMMARY_CSV)        # domain, dtype, count
tgt = pd.read_csv(TARGET_AVAIL_CSV)               # domain, totals + target availability

# -------------------------- Derive Metrics ------------------------ #
# Well-covered columns per domain (>= COVERAGE_COL_PCT)
wc = (
    colprof[colprof["pct_coverage"] >= COVERAGE_COL_PCT]
    .groupby("domain")["column"].nunique()
    .rename("well_covered_columns")
    .to_frame()
)

# Merge EDA + target availability + well-covered counts
summary = (
    eda.merge(
        tgt[
            [
                "domain",
                "total_files",
                "files_with_label_col",
                "files_with_type_col",
                "total_rows",
                "label_nonnull_rows",
                "type_nonnull_rows",
                "label_row_coverage_pct",
                "type_row_coverage_pct",
            ]
        ],
        on="domain",
        how="left",
    )
    .merge(wc, on="domain", how="left")
    .fillna({"well_covered_columns": 0})
)

# --------------------------- Validation --------------------------- #

per_domain_results: List[DomainValidation] = []
for _, r in summary.iterrows():
    has_rows = (r.get("total_rows", 0) or 0) > 0
    has_targets = (r.get("files_with_label_col", 0) or 0) > 0 or (r.get("files_with_type_col", 0) or 0) > 0
    null_ok = float(r.get("mean_null_pct", 100.0)) <= NULL_OK_THRESHOLD
    wc_count = int(r.get("well_covered_columns", 0))
    wc_ok = wc_count >= MIN_WELL_COVERED

    per_domain_results.append(
        DomainValidation(
            domain=str(r["domain"]),
            has_any_rows=bool(has_rows),
            has_any_targets=bool(has_targets),
            mean_null_pct_ok=bool(null_ok),
            well_covered_columns=wc_count,
            well_covered_ok=bool(wc_ok),
            details={
                "files": int(r.get("files", 0)),
                "total_files": int(r.get("total_files", 0)),
                "avg_ncols": float(r.get("avg_ncols", 0.0)),
                "median_sample_rows": float(r.get("median_sample_rows", 0.0)),
                "mean_null_pct": float(r.get("mean_null_pct", 0.0)),
                "label_row_coverage_pct": float(r.get("label_row_coverage_pct", 0.0)),
                "type_row_coverage_pct": float(r.get("type_row_coverage_pct", 0.0)),
            },
        )
    )

overall_pass = all(
    d.has_any_rows and d.has_any_targets and d.mean_null_pct_ok and d.well_covered_ok
    for d in per_domain_results
)

# --------------------------- Save JSON ---------------------------- #
val_result = ValidationResult(overall_pass=overall_pass, per_domain=per_domain_results)
with open(VALIDATION_JSON, "w") as f:
    json.dump(
        {
            "overall_pass": val_result.overall_pass,
            "per_domain": [
                {
                    "domain": d.domain,
                    "has_any_rows": d.has_any_rows,
                    "has_any_targets": d.has_any_targets,
                    "mean_null_pct_ok": d.mean_null_pct_ok,
                    "well_covered_columns": d.well_covered_columns,
                    "well_covered_ok": d.well_covered_ok,
                    "details": d.details,
                }
                for d in val_result.per_domain
            ],
            "thresholds": {
                "NULL_OK_THRESHOLD": NULL_OK_THRESHOLD,
                "COVERAGE_COL_PCT": COVERAGE_COL_PCT,
                "MIN_WELL_COVERED": MIN_WELL_COVERED,
            },
        },
        f,
        indent=2,
    )

# --------------------------- Write Report ------------------------- #
lines = []
lines.append(f"# Week 2 — Validation Report")
lines.append("")
lines.append(f"**Overall status:** {'PASS' if overall_pass else 'FAIL'}")
lines.append("")
for d in per_domain_results:
    status = "PASS" if (d.has_any_rows and d.has_any_targets and d.mean_null_pct_ok and d.well_covered_ok) else "FAIL"
    lines.append(f"## {d.domain} — {status}")
    lines.append(f"- files (sampled): {int(d.details['files'])}")
    lines.append(f"- total_files (domain): {int(d.details['total_files'])}")
    lines.append(f"- avg_ncols: {d.details['avg_ncols']:.2f}")
    lines.append(f"- median_sample_rows: {int(d.details['median_sample_rows']) if d.details['median_sample_rows']==int(d.details['median_sample_rows']) else d.details['median_sample_rows']}")
    lines.append(f"- mean_null_pct: {d.details['mean_null_pct']:.2f}%")
    lines.append(f"- label_row_coverage_pct: {d.details['label_row_coverage_pct']:.2f}%")
    lines.append(f"- type_row_coverage_pct: {d.details['type_row_coverage_pct']:.2f}%")
    lines.append(f"- well_covered_columns (≥{COVERAGE_COL_PCT:.0f}%): {d.well_covered_columns}")
    lines.append(f"- checks: rows={d.has_any_rows}, targets={d.has_any_targets}, nulls_ok={d.mean_null_pct_ok}, coverage_ok={d.well_covered_ok}")
    lines.append("")

with open(REPORT_MD, "w") as f:
    f.write("\n".join(lines))

print(f"validation_report: {VALIDATION_JSON}")
print(f"markdown_report:  {REPORT_MD}")


validation_report: /content/drive/MyDrive/Colab Notebooks/New_cyber_project/week_2/outputs/validation_report.json
markdown_report:  /content/drive/MyDrive/Colab Notebooks/New_cyber_project/week_2/outputs/week2_report.md
