In [None]:
import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from imblearn.over_sampling import ADASYN
import joblib
from sklearn.preprocessing import LabelEncoder
from sdv.single_table import TVAESynthesizer
from sdv.metadata import SingleTableMetadata
from sdv.single_table import CTGANSynthesizer
from sdv.single_table import CopulaGANSynthesizer

: 

In [None]:
# ==========================
# Load and prepare the data
# ==========================

# Load the cleaned NSL-KDD dataset (with 5 classes grouped: normal, dos, r2l, u2r, probe)
data = pd.read_csv("./dataset/cicids2017_clean_all_labels.csv")
print("Original shape:", data.shape)

# Separate features (X) and labels (y)
X = data.drop(columns=["target"])
y = data["target"]

# Split the dataset into training and testing sets (70/30), preserving class distribution
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3, random_state=42, stratify=y
)

# Load the pre-trained transformer (e.g. TypedColumnTransformer)
# This transformer handles encoding of categorical features and scaling of numerical ones
tdt = joblib.load("./typed_cicids2017_all_features.pkl")

# Apply the transformation only to the training set (to avoid data leakage)
X_train_encoded = tdt.transform(X_train)

# Encode labels into integers (e.g. "normal" → 0, "dos" → 1, etc.)
encoder_label = LabelEncoder()
y_train_encoded = encoder_label.fit_transform(y_train)


Original shape: (2827876, 79)


In [None]:
# ==========================
# Custom 1:1 SMOTE Generation
# ==========================
def custom_smote_pointwise(X_cls: np.ndarray, random_state=42):
    rng = np.random.default_rng(random_state)
    X_unique = np.unique(X_cls, axis=0)
    if len(X_unique) < 2:
        raise ValueError("Not enough unique points to interpolate.")
    
    n = len(X_cls)
    synth = []

    for i in range(n):
        x_i = X_cls[i]
        while True:
            x_j = X_unique[rng.integers(0, len(X_unique))]
            if not np.allclose(x_i, x_j):
                break
        lam = rng.uniform(0, 1)
        x_new = x_i + lam * (x_j - x_i)
        synth.append(x_new)

    return np.array(synth)

# Synthetic generation class by class
X_synth_list = []
y_synth_list = []

for label in np.unique(y_train_encoded):
    X_cls = X_train_encoded[y_train_encoded == label]
    try:
        X_synth = custom_smote_pointwise(X_cls, random_state=label)
    except ValueError as e:
        print(f"[!] Class {label} skipped: {e}")
        continue

    X_synth_list.append(X_synth)
    y_synth_list.append([label] * len(X_synth))

# Merge
X_synth_encoded = np.vstack(X_synth_list)
y_synth_encoded = np.hstack(y_synth_list)

# ==========================
# Back to original space
# ==========================
X_synth_original = tdt.inverse_transform(X_synth_encoded)
df_synth = pd.DataFrame(X_synth_original)
df_synth["target"] = encoder_label.inverse_transform(y_synth_encoded)

# Save
df_synth.to_csv("./generations_cicids/synthetic_df_smote.csv", index=False)
print(f"✅ 1:1 synthetic dataset generated: {df_synth.shape[0]} rows (from training set)")


✅ 1:1 synthetic dataset generated: 103961 rows (from training set)


In [10]:

# ==========================
# ADASYN generation class by class (1:1)
# ==========================
X_synth_list = []
y_synth_list = []

for label in tqdm(np.unique(y_train_encoded), desc="Class"):
    X_cls = X_train_encoded[y_train_encoded == label]
    n_real = len(X_cls)

    if n_real < 2:
        print(f"[!] Class {label} skipped (fewer than 2 points)")
        continue

    # Create artificial dataset: class 1 (real), class 0 (fake)
    X_cls_df = pd.DataFrame(X_cls)
    X_fake = pd.concat([X_cls_df, X_cls_df.sample(2, random_state=42)], ignore_index=True)
    y_fake = np.array([1] * n_real + [0, 0])

    # ADASYN: generates ~n_real synthetic points for class "1"
    adasyn = ADASYN(sampling_strategy={1: 2 * n_real}, random_state=label, n_neighbors=5)
    X_res, y_res = adasyn.fit_resample(X_fake, y_fake)

    # Retrieve only synthetic points
    X_synth = X_res[len(X_fake):]
    X_synth = X_synth[:n_real]  # limit to n_real points
    y_synth = [label] * len(X_synth)

    X_synth_list.append(pd.DataFrame(X_synth))
    y_synth_list.append(pd.Series(y_synth))

# Merge
X_synth_encoded = pd.concat(X_synth_list).to_numpy()
y_synth_encoded = pd.concat(y_synth_list).to_numpy()

# ==========================
# Back to original space
# ==========================
X_synth_original = tdt.inverse_transform(X_synth_encoded)
df_synth = pd.DataFrame(X_synth_original)
df_synth["target"] = encoder_label.inverse_transform(y_synth_encoded)

# Save
df_synth.to_csv("./generations/synthetic_df_adasyn.csv", index=False)
print(f"\n✅ 1:1 synthetic ADASYN dataset generated: {df_synth.shape[0]} rows")


Class: 100%|██████████| 6/6 [00:34<00:00,  5.82s/it]



✅ 1:1 synthetic ADASYN dataset generated: 103955 rows


In [None]:
# TVAE

# Automatically detect data types
metadata = SingleTableMetadata()
metadata.detect_from_dataframe(data)

# Train TVAE on the full dataset
tvae = TVAESynthesizer(metadata)
tvae.fit(data)

# Generate 70% of the original dataset
n_samples = int(len(data) * 0.7)
synthetic_data = tvae.sample(n_samples)

# Save
synthetic_data.to_csv("./generations/synthetic_df_tvae.csv", index=False)
print(f"✅ Synthetic data generated: {synthetic_data.shape[0]} rows")


In [None]:
# ==========================
# CTGAN
# ==========================
metadata = SingleTableMetadata()
metadata.detect_from_dataframe(data)

# ==========================
# Train CTGAN on the full dataset
# ==========================
ctgan = CTGANSynthesizer(metadata)
ctgan.fit(data)

# ==========================
# Generate 70% synthetic data
# ==========================
n_samples = int(len(data) * 0.7)
synthetic_data = ctgan.sample(n_samples)

# ==========================
# Save
# ==========================
df_ctgan = synthetic_data.copy()
df_ctgan.to_csv("./generations/synthetic_df_ctgan.csv", index=False)

print(f"✅ CTGAN synthetic data generated: {df_ctgan.shape[0]} rows")
df_ctgan.head()


In [None]:
# ==========================
# CopulaGAN
# ==========================
metadata = SingleTableMetadata()
metadata.detect_from_dataframe(data)

# ==========================
# Train CopulaGAN
# ==========================
copulagan = CopulaGANSynthesizer(metadata)
copulagan.fit(data)

# ==========================
# Generate 70% of the dataset
# ==========================
n_samples = int(len(data) * 0.7)
data_copula_gan = copulagan.sample(n_samples)

# ==========================
# Save
# ==========================
data_copula_gan.to_csv("./generations/synthetic_df_copulagan.csv", index=False)
print(f"✅ CopulaGAN synthetic data generated: {data_copula_gan.shape[0]} rows")
data_copula_gan.head()
