In [1]:
# --- Imports
import os
import numpy as np
import pandas as pd

from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import pairwise_distances
from sklearn.neighbors import KernelDensity

# --- Config
INPUT_FILE = "bean-clean.csv"            # your real data file with header
OUTPUT_DIR = "syn_outputs"         # where to write per-class and merged CSVs
CLASSES = ["SEKER", "BARBUNYA", "BOMBAY", "CALI", "HOROZ"]  # order controls file names
N_SAMPLES_PER_CLASS = 500          # synthetic rows per class (adjust as needed)

# KDE / Adaptivity
BASE_BANDWIDTH = 0.5               # base bandwidth in normalized space
K_NEIGHBORS = 5                    # k-NN for local bandwidth
KERNEL = "gaussian"

# Normalization: choose one
SCALER_KIND = "standard"           # "standard" for StandardScaler; "minmax" for MinMaxScaler

os.makedirs(OUTPUT_DIR, exist_ok=True)

# --- Notes ---
# 1) If your CSV lacks a header row, set header=None in pd.read_csv(), then
#    assign column names yourself and ensure there is a 'Class' column.
# 2) CLASSES list controls which classes are generated and file names.


In [2]:
def load_dataset(path: str) -> pd.DataFrame:
    """
    Load the dataset. Expects a 'Class' column for labels and numeric feature columns.
    """
    df = pd.read_csv(path)
    if "Class" not in df.columns:
        raise ValueError("Expected a 'Class' column in the dataset.")
    return df

def get_numeric_columns(df: pd.DataFrame, exclude=("Class",)) -> list:
    cols = df.select_dtypes(include=[np.number]).columns.tolist()
    for ex in exclude:
        if ex in cols:
            cols.remove(ex)
    return cols

def make_scaler(kind="standard"):
    if kind == "minmax":
        return MinMaxScaler()
    elif kind == "standard":
        return StandardScaler()
    else:
        raise ValueError("SCALER_KIND must be 'standard' or 'minmax'")

def fit_transform_scaler(df_num: pd.DataFrame, scaler) -> np.ndarray:
    """
    Fit scaler on numeric data and return normalized ndarray.
    """
    return scaler.fit_transform(df_num.values)

def inverse_transform_scaler(arr: np.ndarray, scaler) -> np.ndarray:
    return scaler.inverse_transform(arr)


In [3]:
def compute_adaptive_bandwidths(X_norm: np.ndarray, base_bandwidth: float, k: int) -> np.ndarray:
    """
    Compute per-point adaptive bandwidths using mean distance to k nearest neighbors in normalized space.
    """
    # pairwise distances; for large datasets consider an approximate NN to speed up
    D = pairwise_distances(X_norm)
    # sort each row, take first k (excluding zero at index 0 if self-distance)
    # We'll take indices 1..k inclusive to avoid the zero distance to self.
    D_sorted = np.sort(D, axis=1)
    local = D_sorted[:, 1:k+1].mean(axis=1)   # mean distance to k nearest neighbors
    return base_bandwidth * local

def fit_adaptive_kde_models(X_norm: np.ndarray, bandwidths: np.ndarray, kernel: str = "gaussian"):
    """
    Fit one KDE per point with its local bandwidth (mixture-of-KDEs approximation).
    """
    kde_models = []
    for bw in bandwidths:
        kde = KernelDensity(bandwidth=bw, kernel=kernel)
        kde.fit(X_norm)  # fit on full class data for stability
        kde_models.append(kde)
    return kde_models

def sample_from_adaptive_kde(kde_models, n_samples: int) -> np.ndarray:
    """
    Randomly pick per-point KDEs and sample one point each.
    """
    idx = np.random.randint(0, len(kde_models), size=n_samples)
    samples = [kde_models[i].sample(1)[0] for i in idx]
    return np.vstack(samples)


In [4]:
def synthesize_for_class(df_class: pd.DataFrame,
                         numeric_cols: list,
                         n_samples: int,
                         base_bandwidth: float,
                         k_neighbors: int,
                         kernel: str,
                         scaler_kind: str = "standard") -> pd.DataFrame:
    """
    Generate synthetic samples for a single class with normalization -> AKDE -> inverse transform.
    """
    # 1) Numeric slice
    X = df_class[numeric_cols].to_numpy()

    # 2) Fit scaler on this class OR globally (see below)
    #    Option A: class-specific scaler (captures class scale precisely)
    scaler = make_scaler(scaler_kind)
    X_norm = fit_transform_scaler(pd.DataFrame(X, columns=numeric_cols), scaler)

    # 3) Adaptive bandwidths in normalized space
    bw = compute_adaptive_bandwidths(X_norm, base_bandwidth, k_neighbors)

    # 4) Fit adaptive KDEs
    kde_models = fit_adaptive_kde_models(X_norm, bw, kernel=kernel)

    # 5) Sample n_samples in normalized space
    X_syn_norm = sample_from_adaptive_kde(kde_models, n_samples)

    # 6) Inverse transform to original scale
    X_syn = inverse_transform_scaler(X_syn_norm, scaler)

    # 7) Build DataFrame with numeric columns + Class
    df_syn = pd.DataFrame(np.round(X_syn, 4), columns=numeric_cols)
    df_syn["Class"] = df_class["Class"].iloc[0]
    return df_syn


In [5]:
def generate_all_classes(input_file: str,
                         classes: list,
                         out_dir: str,
                         n_samples_per_class: int,
                         numeric_cols: list,
                         base_bandwidth: float,
                         k_neighbors: int,
                         kernel: str,
                         scaler_kind: str = "standard"):
    df = load_dataset(input_file)
    # sanity filter to only known classes if needed
    df = df[df["Class"].isin(classes)].copy()

    # Synthesize per class
    outputs = []
    for idx, cls in enumerate(classes, start=1):
        subset = df[df["Class"] == cls].copy()
        if subset.empty:
            print(f"[WARN] No rows found for class '{cls}'. Skipping.")
            continue

        print(f"-> Generating {n_samples_per_class} samples for class '{cls}' ...")
        df_syn = synthesize_for_class(
            df_class=subset,
            numeric_cols=numeric_cols,
            n_samples=n_samples_per_class,
            base_bandwidth=base_bandwidth,
            k_neighbors=k_neighbors,
            kernel=kernel,
            scaler_kind=scaler_kind
        )

        # Save per class
        out_path = os.path.join(out_dir, f"syn_bean_{idx}.csv")
        df_syn.to_csv(out_path, index=False)
        print(f"   Saved: {out_path} ({len(df_syn)} rows)")
        outputs.append(df_syn)

    # Merge and save
    if outputs:
        merged = pd.concat(outputs, ignore_index=True)
        merged_path = os.path.join(out_dir, "syn_bean_merged.csv")
        merged.to_csv(merged_path, index=False)
        print(f"\nMerged all classes -> {merged_path} ({len(merged)} rows)")
    else:
        print("No outputs produced. Check classes and input file.")


In [6]:
# Load once to detect numeric columns
df_real = load_dataset(INPUT_FILE)
numeric_cols = get_numeric_columns(df_real, exclude=("Class",))
print("Numeric columns:", numeric_cols)

generate_all_classes(
    input_file=INPUT_FILE,
    classes=CLASSES,
    out_dir=OUTPUT_DIR,
    n_samples_per_class=N_SAMPLES_PER_CLASS,
    numeric_cols=numeric_cols,
    base_bandwidth=BASE_BANDWIDTH,
    k_neighbors=K_NEIGHBORS,
    kernel=KERNEL,
    scaler_kind=SCALER_KIND
)


Numeric columns: ['Area', 'Perimeter', 'MajorAxisLength', 'MinorAxisLength', 'AspectRation', 'Eccentricity', 'ConvexArea', 'EquivDiameter', 'Extent', 'Solidity', 'roundness', 'Compactness', 'ShapeFactor1', 'ShapeFactor2', 'ShapeFactor3', 'ShapeFactor4']
-> Generating 500 samples for class 'SEKER' ...
   Saved: syn_outputs/syn_bean_1.csv (500 rows)
-> Generating 500 samples for class 'BARBUNYA' ...
   Saved: syn_outputs/syn_bean_2.csv (500 rows)
-> Generating 500 samples for class 'BOMBAY' ...
   Saved: syn_outputs/syn_bean_3.csv (500 rows)
-> Generating 500 samples for class 'CALI' ...
   Saved: syn_outputs/syn_bean_4.csv (500 rows)
-> Generating 500 samples for class 'HOROZ' ...
   Saved: syn_outputs/syn_bean_5.csv (500 rows)

Merged all classes -> syn_outputs/syn_bean_merged.csv (2500 rows)
