# Categorical Generalization Playground

Use this notebook to experiment with `CategoricalGeneralizationOperation` strategies (frequency-based and merge-low-frequency) on a synthetic workforce dataset.


## How to use

1. Run the setup cell to import modules and build the demo dataset.
2. Run the helper cell to register reusable reporter utilities and the `run_categorical_generalization` wrapper.
3. Execute any of the strategy sections, tweak parameters, or swap in your own dataframe via the `source_df` argument.


In [1]:
import sys
from pathlib import Path

import numpy as np
import pandas as pd

pd.set_option("display.max_rows", 12)
pd.set_option("display.max_columns", None)
np.random.seed(42)


def find_project_root(start: Path) -> Path:
    start = start.resolve()
    for candidate in [start, *start.parents]:
        if (candidate / "pamola_core").exists():
            return candidate
    raise RuntimeError("Run this notebook inside the PAMOLA repository.")

PROJECT_ROOT = find_project_root(Path.cwd())
if str(PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(PROJECT_ROOT))

print(f"Project root: {PROJECT_ROOT}")

roles = [
    "Data Scientist", "Machine Learning Engineer", "Data Analyst",
    "Business Analyst", "Quant Researcher", "Risk Manager",
    "AI Researcher", "Data Engineer", "Statistician", "ML Ops",
    "Product Analyst", "Vision Specialist", "NLP Engineer",
]
regions = ["NA", "EU", "APAC", "LATAM"]
departments = ["R&D", "Finance", "Operations", "Marketing"]

samples = 96
role_weights = np.array([0.18, 0.12, 0.15, 0.10, 0.04, 0.05, 0.03, 0.12, 0.05, 0.05, 0.06, 0.025, 0.025])
role_weights = role_weights / role_weights.sum()

np.random.seed(7)
demo_df = pd.DataFrame({
    "employee_id": np.arange(1, samples + 1),
    "job_role": np.random.choice(roles, size=samples, p=role_weights),
    "department": np.random.choice(departments, size=samples),
    "region": np.random.choice(regions, size=samples, p=[0.4, 0.25, 0.25, 0.1]),
})

demo_df.head(12)


Project root: /root/PAMOLA


Unnamed: 0,employee_id,job_role,department,region
0,1,Data Scientist,Marketing,APAC
1,2,Data Engineer,Finance,
2,3,Data Analyst,Finance,
3,4,Data Engineer,Marketing,
4,5,NLP Engineer,Finance,
5,6,Business Analyst,R&D,
6,7,Business Analyst,Marketing,
7,8,Data Scientist,R&D,EU
8,9,Machine Learning Engineer,Finance,APAC
9,10,Business Analyst,Marketing,EU


In [7]:
# Add extra debug logging
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
import tempfile

from pamola_core.anonymization.generalization.categorical_op import CategoricalGeneralizationOperation
from pamola_core.anonymization.commons.categorical_config import (
    GeneralizationStrategy,
    GroupRareAs,
)
from pamola_core.utils.ops.op_data_source import DataSource
from pamola_core.utils.ops.op_result import OperationStatus


@dataclass
class ReporterSession:
    """Lightweight session log for notebook runs."""

    name: str
    created_at: datetime
    operations: list[dict[str, Any]] = field(default_factory=list)
    artifacts: list[dict[str, Any]] = field(default_factory=list)
    debug_notes: list[str] = field(default_factory=list)

    def add_operation(self, description: str, details: Optional[Dict[str, Any]] = None):
        entry = {
            "description": description,
            "details": details or {},
            "timestamp": datetime.now().isoformat(),
        }
        self.operations.append(entry)
        return entry

    def add_note(self, message: str) -> str:
        note = f"{datetime.now().isoformat()} - {message}"
        self.debug_notes.append(note)
        return note

    def register_artifact(self, name: str, path: str, artifact_type: str = "file"):
        entry = {"name": name, "path": path, "type": artifact_type}
        self.artifacts.append(entry)
        return entry


class NotebookReporter:
    """Debug-friendly reporter grouping events by session."""

    def __init__(self):
        self.sessions: Dict[str, ReporterSession] = {}
        self._session_counter = 0
        self.active_session: ReporterSession = self.start_session()
        self._refresh_views()

    def start_session(self, name: Optional[str] = None) -> ReporterSession:
        if name and name in self.sessions:
            raise ValueError(f"Session '{name}' already exists")
        if not name:
            self._session_counter += 1
            name = f"session_{self._session_counter}"
        session = ReporterSession(name=name, created_at=datetime.now())
        self.sessions[name] = session
        self.active_session = session
        self._refresh_views()
        return session

    def use_session(self, name: str) -> ReporterSession:
        if name in self.sessions:
            self.active_session = self.sessions[name]
        else:
            self.active_session = self.start_session(name)
        self._refresh_views()
        return self.active_session

    def _refresh_views(self) -> None:
        self.operations = self.active_session.operations
        self.artifacts = self.active_session.artifacts

    def add_operation(self, description: str, details: Optional[Dict[str, Any]] = None):
        return self.active_session.add_operation(description, details)

    def add_debug_note(self, message: str) -> str:
        return self.active_session.add_note(message)

    def register_artifact(self, name: str, path: str, artifact_type: str = "file"):
        return self.active_session.register_artifact(name, path, artifact_type)

    def summary(self) -> Dict[str, Dict[str, Any]]:
        return {
            name: {
                "operations": len(session.operations),
                "notes": session.debug_notes,
            }
            for name, session in self.sessions.items()
        }


def _build_preview(operation: CategoricalGeneralizationOperation, df: pd.DataFrame) -> Dict[str, Any]:
    preview_batch, category_mapping, hierarchy_info, hierarchy_cache, fuzzy_matches, unknown_values = (
        CategoricalGeneralizationOperation.process_batch(
            df.copy(deep=True),
            strategy=operation.strategy,
            field_name=operation.field_name,
            mode=operation.mode,
            **operation.process_kwargs
        )
    )
    return {
        "data": preview_batch,
        "category_mapping": category_mapping,
        "hierarchy_info": hierarchy_info,
        "hierarchy_cache": hierarchy_cache,
        "fuzzy_matches": fuzzy_matches,
        "unknown_values": sorted(unknown_values),
    }


def run_categorical_generalization(
    strategy: str,
    *,
    field_name: str = "job_role",
    mode: str = "ENRICH",
    source_df: Optional[pd.DataFrame] = None,
    session_label: Optional[str] = None,
    **operation_kwargs,
) -> Dict[str, Any]:
    reporter = NotebookReporter()
    session_name = session_label or f"{strategy}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
    reporter.use_session(session_name)
    reporter.add_debug_note(f"Running {strategy} with params: {operation_kwargs}")

    working_df = (source_df or demo_df).copy(deep=True)
    data_source = DataSource(dataframes={"main": working_df})

    # Add debug prints
    print("[Debug] Strategy type:", type(strategy))
    print("[Debug] Strategy value:", strategy)
    print("[Debug] Operation kwargs:", operation_kwargs)

    # Only include strategy-specific parameters
    op_params = {
        "field_name": field_name,
        "strategy": strategy,
        "mode": mode,
        "generate_visualization": False,
        "use_cache": False,
        "save_output": False,
    }

    # Add frequency-based strategy parameters if needed
    if strategy == GeneralizationStrategy.FREQUENCY_BASED.value:
        op_params.update({
            "max_categories": operation_kwargs.pop("max_categories", None),
            "freq_threshold": operation_kwargs.pop("freq_threshold", None),
        })
    
    # Add any remaining parameters
    op_params.update(operation_kwargs)

    operation = CategoricalGeneralizationOperation(**op_params)

    with tempfile.TemporaryDirectory(prefix=f"categorical_generalization_{strategy}_") as tmp_dir:
        result = operation.execute(
            data_source=data_source,
            task_dir=Path(tmp_dir),
            reporter=reporter,
        )

        if result.status != OperationStatus.SUCCESS:
            raise RuntimeError(f"Operation failed: {result.error_message}")

        preview = _build_preview(operation, working_df)

    operations_logged = getattr(reporter, "operations", None)
    if operations_logged:
        print(f"[{session_name}] status={result.status.value}; operations_logged={len(operations_logged)}")
    else:
        print(f"[{session_name}] status={result.status.value}; no operations logged")

    return {
        "result": result,
        "preview_df": preview["data"],
        "category_mapping": preview["category_mapping"],
        "unknown_values": preview["unknown_values"],
        "reporter": reporter,
        "operation": operation,
    }

## Frequency-based strategy

Keep the four most frequent job roles and collapse the rest using the frequency-based strategy.


In [3]:
frequency_run = run_categorical_generalization(
    strategy=GeneralizationStrategy.FREQUENCY_BASED.value,
    max_categories=4,
    freq_threshold=0.03,
)
freq_cols = ["job_role", "generalized"]
frequency_run["preview_df"][freq_cols].head(12)

[Debug] Strategy type: <class 'str'>
[Debug] Strategy value: frequency_based
[Debug] Operation kwargs: {'max_categories': 4, 'freq_threshold': 0.03}
[frequency_based_20251109_111409] status=success; operations_logged=3


Unnamed: 0,job_role,generalized
0,Data Scientist,Data Scientist
1,Data Engineer,Data Engineer
2,Data Analyst,Data Analyst
3,Data Engineer,Data Engineer
4,NLP Engineer,NLP Engineer
5,Business Analyst,Business Analyst
6,Business Analyst,Business Analyst
7,Data Scientist,Data Scientist
8,Machine Learning Engineer,Machine Learning Engineer
9,Business Analyst,Business Analyst


## Merge low-frequency strategy

Ensure at least six records per category, grouping rare roles with a numbered template.


In [9]:
merge_run = run_categorical_generalization(
    strategy=GeneralizationStrategy.MERGE_LOW_FREQ.value,
    min_group_size=6,
    group_rare_as=GroupRareAs.OTHER.value,
    rare_value_template="OTHER_{n}",
)
merge_cols = ["job_role", "generalized"]
merge_run["preview_df"][merge_cols].head(12)

[Debug] Strategy type: <class 'str'>
[Debug] Strategy value: merge_low_freq
[Debug] Operation kwargs: {'min_group_size': 6, 'group_rare_as': 'OTHER', 'rare_value_template': 'OTHER_{n}'}
[merge_low_freq_20251109_111833] status=success; operations_logged=3
[merge_low_freq_20251109_111833] status=success; operations_logged=3


Unnamed: 0,job_role,generalized
0,Data Scientist,Data Scientist
1,Data Engineer,Data Engineer
2,Data Analyst,Data Analyst
3,Data Engineer,Data Engineer
4,NLP Engineer,NLP Engineer
5,Business Analyst,Business Analyst
6,Business Analyst,Business Analyst
7,Data Scientist,Data Scientist
8,Machine Learning Engineer,Machine Learning Engineer
9,Business Analyst,Business Analyst


## Regional frequency cap

Apply frequency-based generalization to the `region` field, forcing less-common regions into a shared bucket.


In [10]:
region_run = run_categorical_generalization(
    strategy=GeneralizationStrategy.FREQUENCY_BASED.value,
    field_name="region",
    max_categories=2,
    freq_threshold=0.2,
    allow_unknown=False,
    unknown_value="OTHER_REGION",
)
region_cols = ["region", "generalized"]
region_run["preview_df"][region_cols].head(12)

[Debug] Strategy type: <class 'str'>
[Debug] Strategy value: frequency_based
[Debug] Operation kwargs: {'max_categories': 2, 'freq_threshold': 0.2, 'allow_unknown': False, 'unknown_value': 'OTHER_REGION'}
[frequency_based_20251109_112028] status=success; operations_logged=3
[frequency_based_20251109_112028] status=success; operations_logged=3


Unnamed: 0,region,generalized
0,APAC,APAC
1,,
2,,
3,,
4,,
5,,
6,,
7,EU,EU
8,APAC,APAC
9,EU,EU
