# 02 — Cleaning & Rules

## Zweck
Dieses Notebook bereinigt den exportierten **verbundenen Subgraphen** (bis zu **300k Tracks**), erzwingt einen **stabilen Datenvertrag (Data Contract)** und erzeugt eine **ML-fertige Clean-Layer**.

## Input
- `../data/interim/converted_sqlite_samples/<sample_name>/*.csv`

## Output
- `../data/processed/<sample_name>/clean_csv/*.csv`
- `../data/processed/<sample_name>/parquet/*.parquet`
- `../data/reports/<sample_name>/cleaning_and_rules/cleaning_report.json`

## Erwartete Tabellen (aus dem Exporter)
| Tabelle | Beschreibung | Schlüssel / Beziehung |
|---|---|---|
| `tracks` | Track-Stammdaten | PK: `track_id` |
| `audio_features` | Audio-Features pro Track | PK: `id` |
| `albums` | Album-Stammdaten | PK: `id` |
| `artists` | Artist-Stammdaten | PK: `id` |
| `genres` | Genre-Stammdaten | PK: `id` |
| `r_albums_tracks` | Zuordnung Album ↔ Track | (`album_id`, `track_id`) |
| `r_track_artist` | Zuordnung Track ↔ Artist | (`track_id`, `artist_id`) |
| `r_artist_genre` | Zuordnung Artist ↔ Genre | (`genre_id`, `artist_id`) |
| `r_albums_artists` | Zuordnung Album ↔ Artist | (`album_id`, `artist_id`)|

## Ergebnis
Am Ende steht eine **konsistente, validierte und reproduzierbare** Datenbasis:
- bereinigte CSVs (für schnelle Inspektion),
- Parquet (für effizientes Training/Batching),
- ein JSON-Report mit Regeln, Checks und Statistiken.


## Imports

In [1]:
from __future__ import annotations

import json
import math
import re
import time
import platform
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

import numpy as np
import pandas as pd

# Reproductibility

RANDOM_SEED = 42

np.random.seed(RANDOM_SEED)
pd.set_option("display.max_columns", 250)
pd.set_option("display.max_rows", 40)
pd.set_option("display.width", 160)

pd.options.mode.copy_on_write = True

## Config and Paths

In [2]:
BASE_EXPORT_DIR = Path("../data/interim/converted_sqlite_samples")
CURRENT_SAMPLE_PATH = BASE_EXPORT_DIR / "current_sample.json"
cfg = json.loads(CURRENT_SAMPLE_PATH.read_text())
SAMPLE_NAME = cfg["SAMPLE_NAME"]


@dataclass(frozen=True)
class PipelinePaths:
    raw_dir: Path = BASE_EXPORT_DIR / SAMPLE_NAME
    clean_dir: Path = Path("../data/processed/clean_csv") / SAMPLE_NAME
    parquet_dir: Path = Path("../data/processed/parquet") / SAMPLE_NAME
    report_path: Path = Path("../data/reports/cleaning_and_rules") / SAMPLE_NAME

PATHS = PipelinePaths()

for p in [PATHS.clean_dir,PATHS.report_path,PATHS.raw_dir,PATHS.parquet_dir]:
    p.mkdir(parents=True, exist_ok=True)

@dataclass(frozen=True)
class CleaningPolicy:
    drop_orphan_bridge_rows:bool = True
    clip_popularity:bool = True
    popularity_min:int = 0
    popularity_max:int = 100
    duration_cap_quantile:float = 0.999
    tempo_cap_quantile:float = 0.999
    audio_01_cols:Tuple[str,...] = (
        "acousticness", "danceability", "energy", "instrumentalness",
        "liveness", "speechiness", "valence"
    )
    loudness_range:Tuple[float,float] = (-60.0, 5.0)
    key_range:Tuple[int,int] = (0,11)
    mode_range: Tuple[int, int] = (0, 1)


POLICY = CleaningPolicy()

RUN_META = {
    "run_ts_unix": int(time.time()),
    "python": platform.python_version(),
    "platform": platform.platform(),
    "pandas": pd.__version__,
    "random_seed": RANDOM_SEED,
    "paths": {k: str(v) for k, v in asdict(PATHS).items()},
    "policy": asdict(POLICY),
}

## Helper Utilities

In [3]:
def snake_case(s:str) -> str:
    s = s.strip()
    s = re.sub(r"[^\w]+","_",s)
    s = re.sub(r"__+","_",s)
    return s.strip("_").lower()

def norm_str(s: pd.Series) -> pd.Series:
    """Normalize whitespace and empty strings to NA."""
    s = s.astype("string")
    s = s.str.replace(r"\s+", " ", regex=True).str.strip()
    s = s.replace("", pd.NA)
    return s
def to_int(s: pd.Series) -> pd.Series:
    return pd.to_numeric(s, errors="coerce").astype("Int64")

def to_float(s: pd.Series) -> pd.Series:
    return pd.to_numeric(s, errors="coerce").astype("float64")

def to_bool(s: pd.Series) -> pd.Series:
    """Robust boolean parser to pandas BooleanDtype."""
    x = s.astype("string").str.lower().str.strip()
    out = pd.Series(pd.NA, index=s.index, dtype="boolean")
    out[x.isin(["1", "true", "t", "yes", "y"])] = True
    out[x.isin(["0", "false", "f", "no", "n"])] = False
    return out

def memory_mb(df: pd.DataFrame) -> float:
    return float(df.memory_usage(deep=True).sum()) / (1024 ** 2)

def keep_most_complete_row(df: pd.DataFrame, key_cols: List[str]) -> pd.DataFrame:
    """Resolve duplicates by keeping the row with most non-null values."""
    df = df.copy()
    df["_nonnulls"] = df.notna().sum(axis=1)
    df = df.sort_values("_nonnulls", ascending=False)
    df = df.drop_duplicates(subset=key_cols, keep="first")
    df = df.drop(columns=["_nonnulls"])
    return df.reset_index(drop=True)

def clip_series(s: pd.Series, lo: float, hi: float) -> pd.Series:
    return s.clip(lower=lo, upper=hi)

def parse_release_date_universal(s: pd.Series) -> pd.Series:
    """
    Universal release_date parser that parses EVERYTHING it reasonably can.

    Supports:
      - Spotify date strings: "YYYY", "YYYY-MM", "YYYY-MM-DD"
      - Epoch timestamps (string or numeric), incl. negative and old:
          * >= 11 digits  -> milliseconds
          * 9-10 digits   -> seconds
      - 0 treated as missing (NaT) to avoid fake 1970-01-01

    Returns:
      datetime64[ns] Series with NaT for unparseable values.
    """
    x = s.astype("string").str.strip()
    x = x.replace({"": pd.NA, "nan": pd.NA, "none": pd.NA, "null": pd.NA})

    out = pd.Series(pd.NaT, index=x.index, dtype="datetime64[ns]")

    # ---------- numeric epoch parsing ----------
    num = pd.to_numeric(x, errors="coerce")

    # treat 0 as missing (placeholder -> avoids 1970-01-01 pollution)
    num = num.mask(num == 0)

    # integer coercion
    num_int = pd.Series(pd.NA, index=x.index, dtype="Int64")
    mask_num = num.notna()
    if mask_num.any():
        num_int.loc[mask_num] = np.floor(num.loc[mask_num].astype("float64")).astype("int64")

    # digit-length heuristic (works for old/negative ms values too)
    digits = num_int.abs().astype("string").str.len()
    ms_mask = num_int.notna() & (digits >= 11)          # milliseconds
    s_mask  = num_int.notna() & digits.between(9, 10)   # seconds

    out.loc[ms_mask] = pd.to_datetime(num_int.loc[ms_mask].astype("int64"), unit="ms", errors="coerce")
    out.loc[s_mask]  = pd.to_datetime(num_int.loc[s_mask].astype("int64"), unit="s", errors="coerce")

    # ---------- spotify-like string parsing ----------
    rest = out.isna() & x.notna()

    txt = x.copy()
    txt = txt.where(~txt.str.fullmatch(r"\d{4}"), txt + "-01-01")
    txt = txt.where(~txt.str.fullmatch(r"\d{4}-\d{2}"), txt + "-01")

    out.loc[rest] = pd.to_datetime(txt.loc[rest], errors="coerce")

    return out

def assert_gate(condition: bool, msg: str):
    if not condition:
        raise AssertionError(f"QUALITY GATE FAILED: {msg}")


@dataclass
class TableProfile:
    name: str
    rows: int
    cols: int
    memory_mb: float
    missing_by_col: Dict[str, int]
    duplicate_rows_full: Optional[int] = None
    duplicate_rows_on_keys: Optional[int] = None

def profile_table(df: pd.DataFrame, name: str, key_cols: Optional[List[str]] = None) -> TableProfile:
    missing = {c: int(df[c].isna().sum()) for c in df.columns}
    prof = TableProfile(
        name=name,
        rows=int(len(df)),
        cols=int(df.shape[1]),
        memory_mb=round(memory_mb(df), 2),
        missing_by_col=missing,
    )
    if key_cols:
        prof.duplicate_rows_on_keys = int(df.duplicated(subset=key_cols).sum())
    else:
        prof.duplicate_rows_full = int(df.duplicated().sum())
    return prof

## Load CSV Exports

In [4]:

EXPECTED_FILES = {
    "tracks": "tracks.csv",
    "audio_features": "audio_features.csv",
    "albums": "albums.csv",
    "artists": "artists.csv",
    "genres": "genres.csv",
    "r_albums_tracks": "r_albums_tracks.csv",
    "r_track_artist": "r_track_artist.csv",
    "r_artist_genre": "r_artist_genre.csv",
    "r_albums_artists": "r_albums_artists.csv",
}

missing_files = [k for k, f in EXPECTED_FILES.items() if not (PATHS.raw_dir / f).exists()]
if missing_files:
    print(" Missing CSV files: ", missing_files)
else:
    print("All expected CSV exports found !")

def load_csv(name: str) -> pd.DataFrame:
    fp = PATHS.raw_dir / EXPECTED_FILES[name]
    df = pd.read_csv(fp, low_memory=False)
    df.columns = [snake_case(c) for c in df.columns]
    return df

raw: Dict[str, pd.DataFrame] = {}
for table in EXPECTED_FILES:
    fp = PATHS.raw_dir / EXPECTED_FILES[table]
    if fp.exists():
        raw[table] = load_csv(table)

{k: v.shape for k, v in raw.items()}

All expected CSV exports found !


{'tracks': (300000, 10),
 'audio_features': (299954, 15),
 'albums': (195938, 6),
 'artists': (187440, 4),
 'genres': (5455, 1),
 'r_albums_tracks': (340898, 2),
 'r_track_artist': (407296, 2),
 'r_artist_genre': (194023, 2),
 'r_albums_artists': (224955, 2)}

## Drop useless / high-missing columns

In [5]:
if "tracks" in raw:
    raw["tracks"] = raw["tracks"].drop(columns=["is_playable"], errors="ignore")

# albums: drop column "album_group" (100% missing)
if "albums" in raw:
    raw["albums"] = raw["albums"].drop(columns=["album_group"], errors="ignore")

# OPTIONAL (recommended): keep only an indicator and drop URL column
# (URLs are high-cardinality and not useful directly as a text feature in sklearn tabular models)
if "tracks" in raw and "preview_url" in raw["tracks"].columns:
    raw["tracks"]["has_preview"] = raw["tracks"]["preview_url"].notna().astype("int8")
    raw["tracks"] = raw["tracks"].drop(columns=["preview_url"], errors="ignore")

## Pre - profiles

In [6]:

profiles_before: Dict[str, Any] = {}

if "tracks" in raw:
    profiles_before["tracks"] = asdict(profile_table(raw["tracks"], "tracks", key_cols=["track_id"] if "track_id" in raw["tracks"].columns else ["id"]))
if "audio_features" in raw:
    profiles_before["audio_features"] = asdict(profile_table(raw["audio_features"], "audio_features", key_cols=["id"]))
if "albums" in raw:
    profiles_before["albums"] = asdict(profile_table(raw["albums"], "albums", key_cols=["id"]))
if "artists" in raw:
    profiles_before["artists"] = asdict(profile_table(raw["artists"], "artists", key_cols=["id"]))
if "genres" in raw:
    profiles_before["genres"] = asdict(profile_table(raw["genres"], "genres", key_cols=["id"]))

# bridges
if "r_albums_tracks" in raw:
    profiles_before["r_albums_tracks"] = asdict(profile_table(raw["r_albums_tracks"], "r_albums_tracks", key_cols=["album_id","track_id"]))
if "r_track_artist" in raw:
    profiles_before["r_track_artist"] = asdict(profile_table(raw["r_track_artist"], "r_track_artist", key_cols=["track_id","artist_id"]))
if "r_artist_genre" in raw:
    profiles_before["r_artist_genre"] = asdict(profile_table(raw["r_artist_genre"], "r_artist_genre", key_cols=["genre_id","artist_id"]))
if "r_albums_artists" in raw:
    profiles_before["r_albums_artists"] = asdict(profile_table(raw["r_albums_artists"], "r_albums_artists", key_cols=["album_id","artist_id"]))

pd.DataFrame.from_dict(profiles_before, orient="index")[["rows","cols","memory_mb","duplicate_rows_on_keys"]]

Unnamed: 0,rows,cols,memory_mb,duplicate_rows_on_keys
tracks,300000,9,73.11,0
audio_features,299954,15,82.38,0
albums,195938,5,40.45,0
artists,187440,4,26.94,0
genres,5455,1,0.33,0
r_albums_tracks,340898,2,46.17,0
r_track_artist,407296,2,55.16,0
r_artist_genre,194023,2,24.58,0
r_albums_artists,224955,2,30.46,0


## Cleaning Classes

In [7]:
class BaseCleaner:
    name:str

    def clean(self,df : pd.DataFrame) -> pd.DataFrame:
        raise NotImplementedError

class TracksCleaner(BaseCleaner):
    name = "tracks"

    def clean(self,df : pd.DataFrame) -> pd.DataFrame:

        # Ensure PK column name: track_id
        if "track_id" not in df.columns and "id" in df.columns:
            df = df.rename(columns={"id": "track_id"})

        # Normalize strings
        df["track_id"] = norm_str(df["track_id"])
        df = df[df["track_id"].notna()]

        for c in ["name", "preview_url", "audio_feature_id"]:
            if c in df.columns:
                df[c] = norm_str(df[c])

        # Numerics
        for c in ["disc_number", "track_number", "duration", "popularity"]:
            if c in df.columns:
                df[c] = to_int(df[c])

        # Booleans
        if "explicit" in df.columns:
            df["explicit"] = to_bool(df["explicit"])
        if "is_playable" in df.columns:
            df["is_playable"] = to_bool(df["is_playable"])

        # Rules: duration > 0, cap extremes
        if "duration" in df.columns:
            df.loc[df["duration"] <= 0, "duration"] = pd.NA
            cap = df["duration"].dropna().quantile(POLICY.duration_cap_quantile) if df["duration"].notna().any() else None
            if cap and not math.isnan(cap):
                df["duration"] = df["duration"].clip(upper=int(cap))

        # Rules: popularity in [0, 100]
        if "popularity" in df.columns and POLICY.clip_popularity:
            df["popularity"] = clip_series(df["popularity"], POLICY.popularity_min, POLICY.popularity_max)

        # Dedupe by track_id keeping the most complete row
        df = keep_most_complete_row(df, ["track_id"])
        return df

class AudioFeaturesCleaner(BaseCleaner):
    name = "audio_features"

    def clean(self, df: pd.DataFrame) -> pd.DataFrame:
        df = df.copy()
        df["id"] = norm_str(df["id"])
        df = df[df["id"].notna()]

        # normalize URL-like or text columns
        for c in ["analysis_url"]:
            if c in df.columns:
                df[c] = norm_str(df[c])

        # floats
        for c in POLICY.audio_01_cols:
            if c in df.columns:
                df[c] = clip_series(to_float(df[c]), 0.0, 1.0)

        # tempo
        if "tempo" in df.columns:
            df["tempo"] = to_float(df["tempo"])
            df.loc[df["tempo"] <= 0, "tempo"] = pd.NA
            cap = df["tempo"].dropna().quantile(POLICY.tempo_cap_quantile) if df["tempo"].notna().any() else None
            if cap and not math.isnan(cap):
                df["tempo"] = df["tempo"].clip(upper=float(cap))

        # loudness
        if "loudness" in df.columns:
            df["loudness"] = to_float(df["loudness"])
            lo, hi = POLICY.loudness_range
            df.loc[~df["loudness"].between(lo, hi), "loudness"] = pd.NA

        # ints
        for c in ["key", "mode", "time_signature"]:
            if c in df.columns:
                df[c] = to_int(df[c])

        if "key" in df.columns:
            lo, hi = POLICY.key_range
            df.loc[~df["key"].between(lo, hi), "key"] = pd.NA

        if "mode" in df.columns:
            lo, hi = POLICY.mode_range
            df.loc[~df["mode"].between(lo, hi), "mode"] = pd.NA

        # duration (audio_features.duration is float in your export)
        if "duration" in df.columns:
            df["duration"] = to_float(df["duration"])
            df.loc[df["duration"] <= 0, "duration"] = pd.NA

        df = keep_most_complete_row(df, ["id"])
        return df

class AlbumsCleaner(BaseCleaner):
    name = "albums"

    def clean(self, df: pd.DataFrame) -> pd.DataFrame:
        df = df.copy()
        df["id"] = norm_str(df["id"])
        df = df[df["id"].notna()]

        for c in ["name", "album_group", "album_type", "release_date"]:
            if c in df.columns:
                df[c] = norm_str(df[c])

        if "album_group" in df.columns:
            df["album_group"] = df["album_group"].str.lower()
        if "album_type" in df.columns:
            df["album_type"] = df["album_type"].str.lower()

        if "popularity" in df.columns:
            df["popularity"] = clip_series(to_int(df["popularity"]), POLICY.popularity_min, POLICY.popularity_max)

        # parsed date (keep both)
        if "release_date" in df.columns:
            df["release_date_parsed"] = parse_release_date_universal(df["release_date"])

        df = keep_most_complete_row(df, ["id"])
        return df

class ArtistsCleaner(BaseCleaner):
    name = "artists"

    def clean(self, df: pd.DataFrame) -> pd.DataFrame:
        df = df.copy()
        df["id"] = norm_str(df["id"])
        df = df[df["id"].notna()]

        if "name" in df.columns:
            df["name"] = norm_str(df["name"])

        if "popularity" in df.columns:
            df["popularity"] = clip_series(to_int(df["popularity"]), POLICY.popularity_min, POLICY.popularity_max)

        if "followers" in df.columns:
            df["followers"] = to_int(df["followers"])
            df["followers"] = df["followers"].clip(lower=0)

        df = keep_most_complete_row(df, ["id"])
        return df

class GenresCleaner(BaseCleaner):
    name = "genres"

    def clean(self, df: pd.DataFrame) -> pd.DataFrame:
        df = df.copy()
        # your genres export only has "id"
        df["id"] = norm_str(df["id"])
        df = df[df["id"].notna()].drop_duplicates(subset=["id"], keep="first").reset_index(drop=True)
        return df

class BridgeCleaner(BaseCleaner):
    """Generic bridge cleaner for composite keys + string normalization + dedupe."""
    def __init__(self, name: str, key_cols: List[str]):
        self.name = name
        self.key_cols = key_cols

    def clean(self, df: pd.DataFrame) -> pd.DataFrame:
        df = df.copy()
        for c in self.key_cols:
            df[c] = norm_str(df[c])
        df = df.dropna(subset=self.key_cols)
        df = df.drop_duplicates(subset=self.key_cols, keep="first").reset_index(drop=True)
        return df



## Applying the cleaning

In [8]:
cleaned: Dict[str, pd.DataFrame] = {}

# Entities
if "tracks" in raw:
    cleaned["tracks"] = TracksCleaner().clean(raw["tracks"])
if "audio_features" in raw:
    cleaned["audio_features"] = AudioFeaturesCleaner().clean(raw["audio_features"])
if "albums" in raw:
    cleaned["albums"] = AlbumsCleaner().clean(raw["albums"])
if "artists" in raw:
    cleaned["artists"] = ArtistsCleaner().clean(raw["artists"])
if "genres" in raw:
    cleaned["genres"] = GenresCleaner().clean(raw["genres"])

# Bridges
if "r_albums_tracks" in raw:
    cleaned["r_albums_tracks"] = BridgeCleaner("r_albums_tracks", ["album_id","track_id"]).clean(raw["r_albums_tracks"])
if "r_track_artist" in raw:
    cleaned["r_track_artist"] = BridgeCleaner("r_track_artist", ["track_id","artist_id"]).clean(raw["r_track_artist"])
if "r_artist_genre" in raw:
    cleaned["r_artist_genre"] = BridgeCleaner("r_artist_genre", ["genre_id","artist_id"]).clean(raw["r_artist_genre"])
if "r_albums_artists" in raw:
    cleaned["r_albums_artists"] = BridgeCleaner("r_albums_artists", ["album_id","artist_id"]).clean(raw["r_albums_artists"])

{k: v.shape for k, v in cleaned.items()}


  out.loc[rest] = pd.to_datetime(txt.loc[rest], errors="coerce")


{'tracks': (300000, 9),
 'audio_features': (299954, 15),
 'albums': (195938, 6),
 'artists': (187440, 4),
 'genres': (5455, 1),
 'r_albums_tracks': (340898, 2),
 'r_track_artist': (407296, 2),
 'r_artist_genre': (194023, 2),
 'r_albums_artists': (224955, 2)}


## Referential Integrity Enforcement

In [9]:
# Build ID sets
track_ids = set(cleaned["tracks"]["track_id"].unique()) if "tracks" in cleaned else set()
album_ids = set(cleaned["albums"]["id"].unique()) if "albums" in cleaned else set()
artist_ids = set(cleaned["artists"]["id"].unique()) if "artists" in cleaned else set()
genre_ids = set(cleaned["genres"]["id"].unique()) if "genres" in cleaned else set()
af_ids = set(cleaned["audio_features"]["id"].unique()) if "audio_features" in cleaned else set()

def filter_fk(df: pd.DataFrame, col: str, allowed: set) -> pd.DataFrame:
    return df[df[col].isin(allowed)].copy()

# Bridge FK cleanup
if POLICY.drop_orphan_bridge_rows:
    if "r_albums_tracks" in cleaned:
        rat = cleaned["r_albums_tracks"]
        rat = filter_fk(filter_fk(rat, "album_id", album_ids), "track_id", track_ids)
        cleaned["r_albums_tracks"] = rat.reset_index(drop=True)

    if "r_track_artist" in cleaned:
        rta = cleaned["r_track_artist"]
        rta = filter_fk(filter_fk(rta, "track_id", track_ids), "artist_id", artist_ids)
        cleaned["r_track_artist"] = rta.reset_index(drop=True)

    if "r_artist_genre" in cleaned:
        rag = cleaned["r_artist_genre"]
        rag = filter_fk(filter_fk(rag, "genre_id", genre_ids), "artist_id", artist_ids)
        cleaned["r_artist_genre"] = rag.reset_index(drop=True)

    if "r_albums_artists" in cleaned:
        raa = cleaned["r_albums_artists"]
        raa = filter_fk(filter_fk(raa, "album_id", album_ids), "artist_id", artist_ids)
        cleaned["r_albums_artists"] = raa.reset_index(drop=True)

# Track -> audio_feature_id FK policy: set invalid to NA (do NOT drop track)
if "tracks" in cleaned and "audio_feature_id" in cleaned["tracks"].columns and af_ids:
    bad = cleaned["tracks"]["audio_feature_id"].notna() & ~cleaned["tracks"]["audio_feature_id"].isin(af_ids)
    cleaned["tracks"].loc[bad, "audio_feature_id"] = pd.NA

{k: v.shape for k, v in cleaned.items()}

{'tracks': (300000, 9),
 'audio_features': (299954, 15),
 'albums': (195938, 6),
 'artists': (187440, 4),
 'genres': (5455, 1),
 'r_albums_tracks': (340898, 2),
 'r_track_artist': (407296, 2),
 'r_artist_genre': (194023, 2),
 'r_albums_artists': (218032, 2)}

## Post Cleaning

In [10]:
profiles_after: Dict[str, Any] = {}

profiles_after["tracks"] = asdict(profile_table(cleaned["tracks"], "tracks", key_cols=["track_id"]))
profiles_after["audio_features"] = asdict(profile_table(cleaned["audio_features"], "audio_features", key_cols=["id"]))
profiles_after["albums"] = asdict(profile_table(cleaned["albums"], "albums", key_cols=["id"]))
profiles_after["artists"] = asdict(profile_table(cleaned["artists"], "artists", key_cols=["id"]))
profiles_after["genres"] = asdict(profile_table(cleaned["genres"], "genres", key_cols=["id"]))

if "r_albums_tracks" in cleaned:
    profiles_after["r_albums_tracks"] = asdict(profile_table(cleaned["r_albums_tracks"], "r_albums_tracks", key_cols=["album_id","track_id"]))
if "r_track_artist" in cleaned:
    profiles_after["r_track_artist"] = asdict(profile_table(cleaned["r_track_artist"], "r_track_artist", key_cols=["track_id","artist_id"]))
if "r_artist_genre" in cleaned:
    profiles_after["r_artist_genre"] = asdict(profile_table(cleaned["r_artist_genre"], "r_artist_genre", key_cols=["genre_id","artist_id"]))
if "r_albums_artists" in cleaned:
    profiles_after["r_albums_artists"] = asdict(profile_table(cleaned["r_albums_artists"], "r_albums_artists", key_cols=["album_id","artist_id"]))

pd.DataFrame.from_dict(profiles_after, orient="index")[["rows","cols","memory_mb","duplicate_rows_on_keys"]]


Unnamed: 0,rows,cols,memory_mb,duplicate_rows_on_keys
tracks,300000,9,72.54,0
audio_features,299954,15,83.24,0
albums,195938,6,52.19,0
artists,187440,4,27.3,0
genres,5455,1,0.33,0
r_albums_tracks,340898,2,46.17,0
r_track_artist,407296,2,55.16,0
r_artist_genre,194023,2,24.58,0
r_albums_artists,218032,2,29.53,0


## Outlier & rule-based validation + flags


In diesem Schritt wurden **keine Outlier pauschal gelöscht** (z. B. via IQR), weil viele Spalten in den Spotify-Daten **diskret**, **zero-inflated** oder **heavy-tail** verteilt sind. IQR wäre hier oft **zu aggressiv** und würde viele **gültige Spezialfälle** (z. B. Multi-Disc, Live-Tracks, Rap/Speech) fälschlich als Outlier markieren.
Stattdessen nutzen wir ein **Contract-/Rule-based Cleaning**:

1. **Ungültige Werte (Domain-Verletzung)** → auf `NaN` setzen oder clippen
2. **Extreme, aber plausible Werte (Long-Tail)** → **Quantile-Capping** (z. B. 0.1% / 99.9%) statt Dropping
3. **Signal behalten** → zusätzliche **Flag-Features** (`is_*`) für Extremfälle

---

#### Tracks – Regeln & Outlier-Handling

- **popularity**: auf **[0,100]** clippen (Spotify-Definition), keine Zeilen löschen
- **duration**: `<= 0` → `NaN`, außerdem **99.9%-Quantile-Cap** (Upper Tail); Flag **`is_long_track`**
- **track_number**: diskret/albumabhängig → **Rule-based**: `<=0` und `>200` → `NaN`; Flag **`is_tracknum_extreme`**
- **disc_number**: meist 1 (IQR wäre falsch) → Flag **`is_multidisc`** (`>1`), extreme Werte `>10` → `NaN`; Flag **`is_disc_extreme`**

**Warum?**
IQR markiert bei fast-konstanten/discreten Spalten (disc_number, track_number) zu viele gültige Fälle. Quantile-Cap ist robuster für Long-Tail-Variablen (duration).

---

#### Audio Features – Regeln & Outlier-Handling

- **time_signature**: als **kategorial** behandeln; nur `{3,4,5}` gültig, sonst `NaN`; Flag **`is_time_signature_rare`**
- **tempo**: `<=0` → `NaN`; **99.9%-Quantile-Cap**; Flag **`is_tempo_extreme`**
- **loudness**: außerhalb **[-60, 5]** → `NaN` (Domain); Flag **`is_loudness_very_low`** (`<-40`)
- **speechiness**: [0,1] clippen; Flag **`is_high_speech`** (>= 90%-Quantil)
- **instrumentalness**: [0,1] clippen; Flag **`is_instrumental`** (>= 0.5)
- **key/mode**: streng validieren (`key` 0–11, `mode` 0–1), sonst `NaN`

**Warum?**
Viele Audio-Features sind **bounded** (0..1) oder **kategorial** (time_signature, key/mode). Bei skew/zero-inflation (speechiness, instrumentalness) ist IQR ungeeignet → Flags + ggf. log/bins später.

---

#### Artists – Regeln & Outlier-Handling

- **followers**: negative → `NaN`; **log1p-Feature** (`followers_log1p`) + Flag **`is_followers_extreme`** (99.9%-Quantil)
- **artist popularity**: auf **[0,100]** clippen

**Warum?**
Followers sind stark **heavy-tail** (Superstars vs. Long Tail). Log-Transform ist Standard und stabilisiert Modelle.

---

#### Albums – Regeln & Outlier-Handling

- **release_date**: in `datetime` konvertieren und **Year-Range** validieren (z. B. 1900–2035); invalid → `NaT`; Flag **`is_release_year_invalid`**
- **album popularity**: auf **[0,100]** clippen

**Warum?**
Release-Daten können vereinzelt kaputt sein; wir **löschen keine Alben**, sondern korrigieren/flaggen.

---

### Ergebnis / Vorteil
- **Stabile Features** für ML (weniger extreme Hebelwerte)
- **Keine Join-/Graph-Schäden** durch massives Dropping
- **Wichtige Rare-Cases bleiben erhalten** (über `is_*` Flags)
- Die anschließenden **Quality Gates** prüfen dann die finalen, regel-konformen Daten.


In [11]:
# %%
# ============================================================
# OUTLIERS & RULE-BASED VALIDATION (based on your observations)
# - NO IQR dropping (too aggressive for skew/discrete/zero-inflated cols)
# - Use rule-based invalid -> NA
# - Use quantile caps for heavy tails
# - Add flags to preserve signal (rare cases)
# ============================================================

def quantile_cap(series: pd.Series, q_low: float = 0.001, q_high: float = 0.999) -> tuple[float, float]:
    s = pd.to_numeric(series, errors="coerce").dropna()
    if s.empty:
        return (np.nan, np.nan)
    return (float(s.quantile(q_low)), float(s.quantile(q_high)))

# -------------------------
# TRACKS rules
# -------------------------
if "tracks" in cleaned:
    t = cleaned["tracks"].copy()

    # popularity: domain range [0,100]
    if "popularity" in t.columns:
        t["popularity"] = pd.to_numeric(t["popularity"], errors="coerce").clip(0, 100)

        # optional: binning helper (nice for Notebook 3)
        # t["popularity_bin"] = pd.cut(t["popularity"], bins=[-1,0,20,40,60,80,100],
        #                              labels=["0","1-20","21-40","41-60","61-80","81-100"])

    # duration: invalid -> NA, quantile cap, long-track flag
    if "duration" in t.columns:
        t["duration"] = pd.to_numeric(t["duration"], errors="coerce")
        t.loc[t["duration"] <= 0, "duration"] = pd.NA

        lo, hi = quantile_cap(t["duration"], q_low=0.001, q_high=0.999)
        # cap only upper tail; lower tail usually valid if >0
        t["is_long_track"] = (t["duration"] > hi).astype("int8") if not np.isnan(hi) else 0
        if not np.isnan(hi):
            t["duration"] = t["duration"].clip(upper=int(hi))

    # track_number: discrete + album-structural -> rule-based, NOT IQR
    if "track_number" in t.columns:
        t["track_number"] = pd.to_numeric(t["track_number"], errors="coerce")
        # rule: <=0 invalid
        t.loc[t["track_number"] <= 0, "track_number"] = pd.NA
        # rule: extremely high track numbers probably broken
        t["is_tracknum_extreme"] = (t["track_number"] > 200).astype("int8")
        t.loc[t["track_number"] > 200, "track_number"] = pd.NA

    # disc_number: mostly 1; create is_multidisc, rule extreme
    if "disc_number" in t.columns:
        t["disc_number"] = pd.to_numeric(t["disc_number"], errors="coerce")
        t.loc[t["disc_number"] <= 0, "disc_number"] = pd.NA
        t["is_multidisc"] = (t["disc_number"] > 1).astype("int8")
        t["is_disc_extreme"] = (t["disc_number"] > 10).astype("int8")
        t.loc[t["disc_number"] > 10, "disc_number"] = pd.NA

    cleaned["tracks"] = t


# -------------------------
# AUDIO FEATURES rules
# -------------------------
if "audio_features" in cleaned:
    a = cleaned["audio_features"].copy()

    # time_signature: treat as categorical; rule-based set invalid to NA
    if "time_signature" in a.columns:
        a["time_signature"] = pd.to_numeric(a["time_signature"], errors="coerce")
        valid_ts = {3, 4, 5}
        a["is_time_signature_rare"] = (~a["time_signature"].isin(list(valid_ts))).astype("int8")
        a.loc[~a["time_signature"].isin(list(valid_ts)), "time_signature"] = pd.NA

    # tempo: invalid -> NA, quantile cap, keep rare fast/slow
    if "tempo" in a.columns:
        a["tempo"] = pd.to_numeric(a["tempo"], errors="coerce")
        a.loc[a["tempo"] <= 0, "tempo"] = pd.NA
        lo, hi = quantile_cap(a["tempo"], q_low=0.001, q_high=0.999)
        a["is_tempo_extreme"] = ((a["tempo"] < lo) | (a["tempo"] > hi)).astype("int8") if not np.isnan(hi) else 0
        if not np.isnan(hi):
            a["tempo"] = a["tempo"].clip(upper=hi)

    # loudness: domain sanity; very low values likely bad
    if "loudness" in a.columns:
        a["loudness"] = pd.to_numeric(a["loudness"], errors="coerce")
        # "hard" domain guard
        a.loc[~a["loudness"].between(-60, 5), "loudness"] = pd.NA
        # additional flag: suspiciously low (often data issue)
        a["is_loudness_very_low"] = (a["loudness"] < -40).astype("int8")

    # duration in audio_features
    if "duration" in a.columns:
        a["duration"] = pd.to_numeric(a["duration"], errors="coerce")
        a.loc[a["duration"] <= 0, "duration"] = pd.NA
        _, hi = quantile_cap(a["duration"], q_low=0.001, q_high=0.999)
        a["is_af_long"] = (a["duration"] > hi).astype("int8") if not np.isnan(hi) else 0
        if not np.isnan(hi):
            a["duration"] = a["duration"].clip(upper=hi)

    # speechiness + instrumentalness: skew / zero inflation -> flags + (optional) log transform later
    if "speechiness" in a.columns:
        a["speechiness"] = pd.to_numeric(a["speechiness"], errors="coerce").clip(0, 1)
        a["is_high_speech"] = (a["speechiness"] >= a["speechiness"].dropna().quantile(0.90)).astype("int8") \
                              if a["speechiness"].notna().any() else 0

    if "instrumentalness" in a.columns:
        a["instrumentalness"] = pd.to_numeric(a["instrumentalness"], errors="coerce").clip(0, 1)
        # classic threshold used often in practice
        a["is_instrumental"] = (a["instrumentalness"] >= 0.5).astype("int8")

    # key/mode: categorical sanity
    if "key" in a.columns:
        a["key"] = pd.to_numeric(a["key"], errors="coerce")
        a.loc[~a["key"].between(0, 11), "key"] = pd.NA

    if "mode" in a.columns:
        a["mode"] = pd.to_numeric(a["mode"], errors="coerce")
        a.loc[~a["mode"].between(0, 1), "mode"] = pd.NA

    cleaned["audio_features"] = a


# -------------------------
# ARTISTS rules
# -------------------------
if "artists" in cleaned:
    ar = cleaned["artists"].copy()

    # followers: heavy tail -> do not drop, add log feature and extreme flag
    if "followers" in ar.columns:
        ar["followers"] = pd.to_numeric(ar["followers"], errors="coerce")
        ar.loc[ar["followers"] < 0, "followers"] = pd.NA
        _, hi = quantile_cap(ar["followers"], q_low=0.001, q_high=0.999)
        ar["is_followers_extreme"] = (ar["followers"] > hi).astype("int8") if not np.isnan(hi) else 0
        ar["followers_log1p"] = np.log1p(ar["followers"].fillna(0)).astype("float64")

    # popularity: bounded [0,100] and can be binned later
    if "popularity" in ar.columns:
        ar["popularity"] = pd.to_numeric(ar["popularity"], errors="coerce").clip(0, 100)

    cleaned["artists"] = ar


# -------------------------
# ALBUMS rules
# -------------------------
if "albums" in cleaned:
    al = cleaned["albums"].copy()

    # release_date: validate year range; keep flag instead of dropping
    # Notebook 2 already parsed release_date_parsed; enforce year sanity
    if "release_date_parsed" in al.columns:
        years = pd.to_datetime(al["release_date_parsed"], errors="coerce").dt.year
        # conservative sanity range (Spotify catalog)
        al["is_release_year_invalid"] = ((years < 1900) | (years > 2035)).astype("int8")
        al.loc[(years < 1900) | (years > 2035), "release_date_parsed"] = pd.NaT

        # add derived year features early (optional)
        al["release_year"] = pd.to_datetime(al["release_date_parsed"], errors="coerce").dt.year.astype("Int64")

    # album popularity: clip
    if "popularity" in al.columns:
        al["popularity"] = pd.to_numeric(al["popularity"], errors="coerce").clip(0, 100)

    cleaned["albums"] = al


print("Outlier rules applied: rule-based invalidation + quantile caps + flags (no IQR dropping).")


Outlier rules applied: rule-based invalidation + quantile caps + flags (no IQR dropping).


## Quality Gates

In [12]:
# PKs must be unique and non-null
assert_gate(cleaned["tracks"]["track_id"].notna().all(), "tracks.track_id contains NA")
assert_gate(cleaned["tracks"]["track_id"].is_unique, "tracks.track_id not unique")

assert_gate(cleaned["audio_features"]["id"].notna().all(), "audio_features.id contains NA")
assert_gate(cleaned["audio_features"]["id"].is_unique, "audio_features.id not unique")

assert_gate(cleaned["albums"]["id"].notna().all(), "albums.id contains NA")
assert_gate(cleaned["albums"]["id"].is_unique, "albums.id not unique")

assert_gate(cleaned["artists"]["id"].notna().all(), "artists.id contains NA")
assert_gate(cleaned["artists"]["id"].is_unique, "artists.id not unique")

assert_gate(cleaned["genres"]["id"].notna().all(), "genres.id contains NA")
assert_gate(cleaned["genres"]["id"].is_unique, "genres.id not unique")

# Range sanity
if "popularity" in cleaned["tracks"].columns:
    assert_gate(cleaned["tracks"]["popularity"].dropna().between(0, 100).all(), "tracks.popularity out of [0,100]")

if "duration" in cleaned["tracks"].columns:
    assert_gate((cleaned["tracks"]["duration"].dropna() > 0).all(), "tracks.duration has non-positive values")

for c in POLICY.audio_01_cols:
    if c in cleaned["audio_features"].columns:
        assert_gate(cleaned["audio_features"][c].dropna().between(0.0, 1.0).all(), f"audio_features.{c} out of [0,1]")

# Bridge referential integrity
if "r_albums_tracks" in cleaned:
    rat = cleaned["r_albums_tracks"]
    assert_gate(rat["album_id"].isin(album_ids).all(), "r_albums_tracks has invalid album_id")
    assert_gate(rat["track_id"].isin(track_ids).all(), "r_albums_tracks has invalid track_id")

if "r_track_artist" in cleaned:
    rta = cleaned["r_track_artist"]
    assert_gate(rta["track_id"].isin(track_ids).all(), "r_track_artist has invalid track_id")
    assert_gate(rta["artist_id"].isin(artist_ids).all(), "r_track_artist has invalid artist_id")

if "r_artist_genre" in cleaned:
    rag = cleaned["r_artist_genre"]
    assert_gate(rag["genre_id"].isin(genre_ids).all(), "r_artist_genre has invalid genre_id")
    assert_gate(rag["artist_id"].isin(artist_ids).all(), "r_artist_genre has invalid artist_id")

if "r_albums_artists" in cleaned:
    raa = cleaned["r_albums_artists"]
    assert_gate(raa["album_id"].isin(album_ids).all(), "r_albums_artists has invalid album_id")
    assert_gate(raa["artist_id"].isin(artist_ids).all(), "r_albums_artists has invalid artist_id")

print(" All quality gates passed.")


# %%
# ============================================================
# Save Clean Layer (CSV + Parquet)
# ============================================================

def save_table(df: pd.DataFrame, name: str):
    csv_path = PATHS.clean_dir / f"{name}.csv"
    pq_path = PATHS.parquet_dir / f"{name}.parquet"

    df.to_csv(csv_path, index=False, encoding="utf-8-sig")
    # Parquet is the preferred format for Notebook 3 performance
    df.to_parquet(pq_path, index=False)

for name, df in cleaned.items():
    save_table(df, name)

print(" Clean layer saved to:")
print(" -", PATHS.clean_dir)
print(" -", PATHS.parquet_dir)

 All quality gates passed.
 Clean layer saved to:
 - ..\data\processed\clean_csv\slice_001
 - ..\data\processed\parquet\slice_001


## Cleaning Report (JSON) - audit + reproducibility

In [13]:
rowcount_delta: Dict[str, Dict[str, int]] = {}
for name in cleaned.keys():
    before = int(raw[name].shape[0]) if name in raw else 0
    after = int(cleaned[name].shape[0])
    rowcount_delta[name] = {"before": before, "after": after, "delta": after - before}

report = {
    "run_meta": RUN_META,
    "profiles_before": profiles_before,
    "profiles_after": profiles_after,
    "rowcount_delta": rowcount_delta,
    "notes": {
        "bridge_policy": "drop orphan rows (referential integrity enforced)",
        "track_audio_feature_fk_policy": "invalid audio_feature_id set to NA (tracks not dropped)",
        "popularity_policy": "clipped to [0,100]",
        "audio_features_policy": "0..1 scalars clipped; loudness/key/mode validated",
        "duplicate_policy": "kept the most complete row per PK",
        "export_formats": "CSV + Parquet",
    },
}

report_path = PATHS.report_path / "cleaning_report.json"
report_path.write_text(json.dumps(report, indent=2), encoding="utf-8")

print(" Cleaning report written:", report_path)

 Cleaning report written: ..\data\reports\cleaning_and_rules\slice_001\cleaning_report.json
