In [1]:
import numpy as np
import pandas as pd
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.autograd import Variable
from torch.autograd import grad as torch_grad
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, classification_report
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
import xgboost as xgb
from scipy.stats import entropy
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
import time
from tqdm import tqdm
from google.colab import drive


drive.mount('/content/drive/')
%cd /content/drive/MyDrive/Colab Notebooks/Katabatic/CrGAN/car/

# Suppress warnings
warnings.filterwarnings("ignore")

# Setting random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Check if CUDA is available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).
/content/drive/MyDrive/Colab Notebooks/Katabatic/CrGAN/car
Using device: cpu


In [2]:
def load_car_data(data_path):
    # Column names for the car dataset
    column_names = ['buying', 'maint', 'doors', 'persons', 'lug_boot', 'safety', 'class']

    # Load data
    df = pd.read_csv(data_path, header=None, names=column_names)

    print(f"Dataset shape: {df.shape}")

    # Split into train and test (80% train, 20% test)
    df_train, df_test = train_test_split(df, test_size=0.2, random_state=42)

    print(f"Training data shape: {df_train.shape}")
    print(f"Testing data shape: {df_test.shape}")

    # Split features and target
    X_train = df_train.drop('class', axis=1)
    y_train = df_train['class']
    X_test = df_test.drop('class', axis=1)
    y_test = df_test['class']

    # Identify categorical columns
    categorical_cols = X_train.columns.tolist()  # All columns are categorical

    # Create preprocessing pipeline for categorical data using one-hot encoding
    categorical_transformer = Pipeline(steps=[
        ('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
    ])

    preprocessor = ColumnTransformer(
        transformers=[
            ('cat', categorical_transformer, categorical_cols)
        ])

    # Fit and transform the data
    X_train_transformed = preprocessor.fit_transform(X_train)
    X_test_transformed = preprocessor.transform(X_test)

    print(f"Processed training data shape: {X_train_transformed.shape}")
    print(f"Processed testing data shape: {X_test_transformed.shape}")

    # Get one-hot encoding feature names for later use
    cat_encoder = preprocessor.named_transformers_['cat'].named_steps['onehot']
    feature_names = cat_encoder.get_feature_names_out(categorical_cols)

    # Create label encoder for the target classes
    unique_classes = sorted(df['class'].unique())
    label_encoder = {cls: i for i, cls in enumerate(unique_classes)}
    inverse_label_encoder = {i: cls for cls, i in label_encoder.items()}

    # Encode targets
    y_train_encoded = y_train.map(label_encoder)
    y_test_encoded = y_test.map(label_encoder)

    # Store original categorical values
    cat_values = {}
    for col in categorical_cols:
        cat_values[col] = sorted(df[col].unique())

    return (X_train_transformed, y_train_encoded, X_test_transformed, y_test_encoded,
            preprocessor, feature_names, label_encoder, inverse_label_encoder, cat_values,
            X_train, y_train)

# Custom dataset class
class CarDataset(Dataset):
    def __init__(self, features, labels=None):
        self.features = torch.tensor(features, dtype=torch.float32)
        if labels is not None:
            self.labels = torch.tensor(labels.values, dtype=torch.long)
        else:
            self.labels = None

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        if self.labels is not None:
            return self.features[idx], self.labels[idx]
        else:
            return self.features[idx]

# Generator Network
class Generator(nn.Module):
    def __init__(self, latent_dim, output_dim):
        super(Generator, self).__init__()
        self.latent_dim = latent_dim
        self.output_dim = output_dim

        self.model = nn.Sequential(
            nn.Linear(latent_dim, 128),
            nn.BatchNorm1d(128),
            nn.LeakyReLU(0.2),

            nn.Linear(128, 256),
            nn.BatchNorm1d(256),
            nn.LeakyReLU(0.2),

            nn.Linear(256, 512),
            nn.BatchNorm1d(512),
            nn.LeakyReLU(0.2),

            nn.Linear(512, output_dim),
            nn.Sigmoid()  # Use Sigmoid for one-hot encoded data
        )

    def forward(self, z):
        return self.model(z)

# Critic Network (Discriminator)
class Critic(nn.Module):
    def __init__(self, input_dim):
        super(Critic, self).__init__()
        self.input_dim = input_dim

        self.model = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),

            nn.Linear(256, 128),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),

            nn.Linear(128, 64),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),

            nn.Linear(64, 1)
        )

    def forward(self, x):
        return self.model(x)

# Cramer GAN Implementation
class CramerGAN:
    def __init__(self, data_dim, latent_dim=100, critic_iterations=5, lambda_gp=10):
        self.data_dim = data_dim
        self.latent_dim = latent_dim
        self.critic_iterations = critic_iterations
        self.lambda_gp = lambda_gp

        # Initialize networks
        self.generator = Generator(latent_dim, data_dim).to(device)
        self.critic = Critic(data_dim).to(device)

        # Setup optimizers
        self.g_optimizer = optim.Adam(self.generator.parameters(), lr=0.0002, betas=(0.5, 0.9))
        self.c_optimizer = optim.Adam(self.critic.parameters(), lr=0.0002, betas=(0.5, 0.9))

        # Initialize loss tracking
        self.g_losses = []
        self.c_losses = []

    def _critic_train_iteration(self, real_data, batch_size):
        # Generate random noise
        noise = torch.randn(batch_size, self.latent_dim).to(device)

        # Generate fake data
        fake_data = self.generator(noise)

        # Get critic outputs
        critic_real = self.critic(real_data)
        critic_fake = self.critic(fake_data)

        # Calculate Cramer distance
        critic_real2 = self.critic(torch.roll(real_data, shifts=1, dims=0))
        critic_fake2 = self.critic(torch.roll(fake_data, shifts=1, dims=0))

        # Cramer GAN loss function
        c_loss = torch.mean(critic_real - critic_fake) - 0.5 * torch.mean(torch.pow(critic_real - critic_real2, 2)) + 0.5 * torch.mean(torch.pow(critic_fake - critic_fake2, 2))

        # Calculate gradient penalty
        alpha = torch.rand(batch_size, 1).to(device)
        interpolates = alpha * real_data + ((1 - alpha) * fake_data)
        interpolates.requires_grad_(True)

        critic_interpolates = self.critic(interpolates)
        gradients = torch_grad(outputs=critic_interpolates, inputs=interpolates,
                              grad_outputs=torch.ones_like(critic_interpolates).to(device),
                              create_graph=True, retain_graph=True)[0]

        gradients = gradients.view(batch_size, -1)
        gradient_penalty = self.lambda_gp * ((gradients.norm(2, dim=1) - 1) ** 2).mean()

        # Update critic
        self.c_optimizer.zero_grad()
        c_loss_total = c_loss + gradient_penalty
        c_loss_total.backward()
        self.c_optimizer.step()

        return c_loss_total.item()

    def _generator_train_iteration(self, batch_size):
        # Generate random noise
        noise = torch.randn(batch_size, self.latent_dim).to(device)

        # Generate fake data
        fake_data = self.generator(noise)

        # Calculate critic outputs
        critic_fake = self.critic(fake_data)
        critic_fake2 = self.critic(torch.roll(fake_data, shifts=1, dims=0))

        # Generator loss is negative of critic loss
        g_loss = -torch.mean(critic_fake) + 0.5 * torch.mean(torch.pow(critic_fake - critic_fake2, 2))

        # Update generator
        self.g_optimizer.zero_grad()
        g_loss.backward()
        self.g_optimizer.step()

        return g_loss.item()

    def train(self, data_loader, epochs, save_interval=10, verbose=True):
        for epoch in range(epochs):
            epoch_start_time = time.time()
            c_loss_total = 0
            g_loss_total = 0
            num_batches = 0

            for i, (real_data, _) in enumerate(data_loader):
                batch_size = real_data.size(0)
                real_data = real_data.to(device)

                # Train critic
                for _ in range(self.critic_iterations):
                    c_loss = self._critic_train_iteration(real_data, batch_size)
                c_loss_total += c_loss

                # Train generator
                g_loss = self._generator_train_iteration(batch_size)
                g_loss_total += g_loss

                num_batches += 1

            # Calculate average loss for the epoch
            c_loss_avg = c_loss_total / num_batches
            g_loss_avg = g_loss_total / num_batches

            self.c_losses.append(c_loss_avg)
            self.g_losses.append(g_loss_avg)

            epoch_time = time.time() - epoch_start_time

            if verbose and (epoch % save_interval == 0 or epoch == epochs - 1):
                print(f"Epoch [{epoch+1}/{epochs}] | Critic Loss: {c_loss_avg:.4f} | Generator Loss: {g_loss_avg:.4f} | Time: {epoch_time:.2f}s")

    def generate_samples(self, num_samples):
        self.generator.eval()
        noise = torch.randn(num_samples, self.latent_dim).to(device)
        with torch.no_grad():
            generated_data = self.generator(noise).cpu().numpy()
        self.generator.train()
        return generated_data

    def save_model(self, path):
        torch.save({
            'generator_state_dict': self.generator.state_dict(),
            'critic_state_dict': self.critic.state_dict(),
            'g_optimizer_state_dict': self.g_optimizer.state_dict(),
            'c_optimizer_state_dict': self.c_optimizer.state_dict(),
        }, path)

    def load_model(self, path):
        checkpoint = torch.load(path)
        self.generator.load_state_dict(checkpoint['generator_state_dict'])
        self.critic.load_state_dict(checkpoint['critic_state_dict'])
        self.g_optimizer.load_state_dict(checkpoint['g_optimizer_state_dict'])
        self.c_optimizer.load_state_dict(checkpoint['c_optimizer_state_dict'])

# Post-process generated data for categorical features
def post_process_car_data(synthetic_data, preprocessor, cat_values, feature_names):
    """
    Post-process the synthetic data to convert one-hot encoded features back to categorical values
    """
    # Create a DataFrame with one-hot encoded columns
    synthetic_df = pd.DataFrame(synthetic_data, columns=feature_names)

    # Extract categorical feature groups
    result_df = pd.DataFrame()

    # Process each categorical column
    for col_name, values in cat_values.items():
        # Get one-hot columns for this feature
        col_pattern = f"{col_name}_"
        category_cols = [c for c in feature_names if c.startswith(col_pattern)]

        # Get the most likely category for each sample
        category_probs = synthetic_df[category_cols].values
        category_indices = np.argmax(category_probs, axis=1)

        # Map indices back to original categories
        # Extract the original category from the one-hot column name
        categories = [c.split('_', 1)[1] for c in category_cols]
        result_df[col_name] = [categories[idx] for idx in category_indices]

    return result_df



In [3]:
# Evaluation Metrics

# 1. Machine Learning Utility (TSTR)
def evaluate_tstr(real_data, synthetic_data, real_labels, random_state=42):
    """
    Train classifiers on synthetic data and test on real data (TSTR)
    Returns accuracy for each classifier
    """
    # Train-test split for real data
    X_train, X_test, y_train, y_test = train_test_split(
        real_data, real_labels, test_size=0.2, random_state=random_state
    )

    # Synthetic data (all used for training)
    X_synth = synthetic_data

    # Ensure proper dimensions for labels
    if isinstance(y_train, pd.Series):
        y_train = y_train.values

    # Create synthetic labels based on real distribution
    num_classes = len(np.unique(y_train))
    class_distribution = np.bincount(y_train.astype(int), minlength=num_classes) / len(y_train)
    np.random.seed(random_state)
    y_synth = np.random.choice(range(num_classes), size=len(X_synth), p=class_distribution)

    # Define classifiers
    classifiers = {
        'Logistic Regression': LogisticRegression(max_iter=1000, random_state=random_state),
        'MLP': MLPClassifier(hidden_layer_sizes=(100, 50), max_iter=500, random_state=random_state),
        'Random Forest': RandomForestClassifier(n_estimators=100, random_state=random_state),
        'XGBoost': xgb.XGBClassifier(n_estimators=100, random_state=random_state)
    }

    results = {}

    for name, clf in classifiers.items():
        # Train on synthetic data
        clf.fit(X_synth, y_synth)

        # Test on real data
        y_pred = clf.predict(X_test)

        # Calculate metrics
        accuracy = accuracy_score(y_test, y_pred)

        # We use weighted f1 since we have multiple classes
        f1 = f1_score(y_test, y_pred, average='weighted')

        results[name] = {
            'accuracy': accuracy,
            'f1_score': f1
        }

    return results

# 2. Statistical Similarity for categorical data
def evaluate_categorical_similarity(real_df, synthetic_df):
    """
    Calculate statistical similarity for categorical features
    """
    results = {'JSD': {}}

    # For each categorical column, calculate JSD
    for col in real_df.columns:
        # Get value counts
        real_counts = real_df[col].value_counts(normalize=True).sort_index()
        synth_counts = synthetic_df[col].value_counts(normalize=True).sort_index()

        # Align the distributions
        all_categories = sorted(set(real_counts.index) | set(synth_counts.index))
        real_dist = np.array([real_counts.get(cat, 0) for cat in all_categories])
        synth_dist = np.array([synth_counts.get(cat, 0) for cat in all_categories])

        # Add small epsilon to avoid zeros
        epsilon = 1e-10
        real_dist = real_dist + epsilon
        synth_dist = synth_dist + epsilon

        # Normalize
        real_dist = real_dist / real_dist.sum()
        synth_dist = synth_dist / synth_dist.sum()

        # Calculate JSD
        jsd = jensen_shannon_divergence(real_dist, synth_dist)
        results['JSD'][col] = jsd

    # Average JSD across all features
    results['JSD_avg'] = np.mean(list(results['JSD'].values()))

    return results

def jensen_shannon_divergence(p, q):
    """
    Calculate Jensen-Shannon Divergence between distributions p and q
    """
    # Ensure p and q are normalized
    p = p / np.sum(p)
    q = q / np.sum(q)

    m = 0.5 * (p + q)

    # Calculate JSD
    jsd = 0.5 * (entropy(p, m) + entropy(q, m))

    return jsd

def plot_loss_curves(model):
    """
    Plot the loss curves for the generator and critic
    """
    plt.figure(figsize=(10, 5))
    plt.plot(model.g_losses, label='Generator Loss')
    plt.plot(model.c_losses, label='Critic Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('CramerGAN Training Loss for Car Dataset')
    plt.legend()
    plt.grid(True)
    plt.savefig('car_crgan_loss_curves.png')
    plt.close()

def plot_categorical_distributions(real_df, synthetic_df):
    """
    Plot distributions of real vs synthetic data for categorical features
    """
    n_features = len(real_df.columns)

    plt.figure(figsize=(15, n_features * 4))

    for i, col in enumerate(real_df.columns):
        plt.subplot(n_features, 1, i+1)

        # Calculate proportions
        real_props = real_df[col].value_counts(normalize=True).sort_index()
        synth_props = synthetic_df[col].value_counts(normalize=True).sort_index()

        # Get all categories
        all_categories = sorted(set(real_props.index) | set(synth_props.index))

        # Create a DataFrame for plotting
        plot_df = pd.DataFrame({
            'Category': all_categories * 2,
            'Proportion': [real_props.get(cat, 0) for cat in all_categories] +
                         [synth_props.get(cat, 0) for cat in all_categories],
            'Type': ['Real'] * len(all_categories) + ['Synthetic'] * len(all_categories)
        })

        # Plot
        sns.barplot(x='Category', y='Proportion', hue='Type', data=plot_df)
        plt.title(f'Distribution for {col}')
        plt.xticks(rotation=45)
        plt.ylabel('Proportion')
        plt.legend()

    plt.tight_layout()
    plt.savefig('car_categorical_distributions.png')
    plt.close()

def plot_class_distribution(real_labels, synthetic_labels, label_encoder):
    """
    Plot the class distribution of real vs synthetic data
    """
    plt.figure(figsize=(12, 6))

    # Get class counts
    real_class_counts = pd.Series(real_labels).value_counts(normalize=True)
    synth_class_counts = pd.Series(synthetic_labels).value_counts(normalize=True)

    # Create inverse label encoder
    inverse_label_encoder = {v: k for k, v in label_encoder.items()}

    # Get all classes
    all_classes = sorted(set(real_class_counts.index) | set(synth_class_counts.index))

    # Create plot data
    plot_df = pd.DataFrame({
        'Class': [inverse_label_encoder.get(c, c) for c in all_classes] * 2,
        'Proportion': [real_class_counts.get(c, 0) for c in all_classes] +
                     [synth_class_counts.get(c, 0) for c in all_classes],
        'Type': ['Real'] * len(all_classes) + ['Synthetic'] * len(all_classes)
    })

    # Plot
    sns.barplot(x='Class', y='Proportion', hue='Type', data=plot_df)
    plt.title('Class Distribution: Real vs Synthetic Car Evaluations')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.savefig('car_class_distribution.png')
    plt.close()

# Main function
def main():
    # File path
    data_path = 'data/car.csv'

    # Load and preprocess data
    (X_train_transformed, y_train, X_test_transformed, y_test,
     preprocessor, feature_names, label_encoder, inverse_label_encoder,
     cat_values, X_train_original, y_train_original) = load_car_data(data_path)

    # Create dataset and dataloader
    train_dataset = CarDataset(X_train_transformed, y_train)
    train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)

    # Initialize and train the model
    data_dim = X_train_transformed.shape[1]
    latent_dim = 100

    print(f"Data dimension: {data_dim}")
    print(f"Latent dimension: {latent_dim}")

    crgan = CramerGAN(data_dim, latent_dim)

    # Train the model
    epochs = 300
    print(f"Training CramerGAN for {epochs} epochs...")
    crgan.train(train_loader, epochs, save_interval=10)

    # Save the model
    crgan.save_model('car_crgan_model.pt')

    # Plot loss curves
    plot_loss_curves(crgan)

    # Generate synthetic data
    num_samples = 1000
    print(f"Generating {num_samples} synthetic samples...")
    synthetic_data_raw = crgan.generate_samples(num_samples)

    # Post-process the synthetic data
    synthetic_df = post_process_car_data(synthetic_data_raw, preprocessor, cat_values, feature_names)

    # Generate synthetic labels using a classifier trained on real data
    clf = RandomForestClassifier(n_estimators=100, random_state=42)
    clf.fit(X_train_transformed, y_train)
    synthetic_labels_raw = clf.predict(synthetic_data_raw)

    # Convert numeric labels to original class values
    synthetic_labels = [inverse_label_encoder[label] for label in synthetic_labels_raw]

    # Add class labels to the synthetic dataframe
    synthetic_df['class'] = synthetic_labels

    # Save the synthetic data
    synthetic_df.to_csv('synthetic_car_data.csv', index=False)

    # Plot distributions
    plot_categorical_distributions(X_train_original, synthetic_df.drop('class', axis=1))

    # Plot class distribution
    plot_class_distribution(y_train_original, synthetic_df['class'], label_encoder)

    # Statistical similarity evaluation
    print("Evaluating statistical similarity...")
    stat_results = evaluate_categorical_similarity(X_train_original, synthetic_df.drop('class', axis=1))

    print("\nJensen-Shannon Divergence (average):", stat_results['JSD_avg'])
    print("\nJSD per feature:")
    for feature, jsd in stat_results['JSD'].items():
        print(f"  {feature}: {jsd:.4f}")

    # Machine Learning Utility (TSTR) evaluation
    print("\nEvaluating Machine Learning Utility (TSTR)...")
    tstr_results = evaluate_tstr(X_train_transformed, synthetic_data_raw, y_train)

    print("\nTSTR Results:")
    for clf_name, metrics in tstr_results.items():
        print(f"{clf_name}: Accuracy = {metrics['accuracy']:.4f}, F1 Score = {metrics['f1_score']:.4f}")

    print("\nEvaluation complete! Check the output directory for plots and saved model.")



In [4]:
 if __name__ == "__main__":
    main()

Dataset shape: (1729, 7)
Training data shape: (1383, 7)
Testing data shape: (346, 7)
Processed training data shape: (1383, 27)
Processed testing data shape: (346, 27)
Data dimension: 27
Latent dimension: 100
Training CramerGAN for 300 epochs...
Epoch [1/300] | Critic Loss: 3.8554 | Generator Loss: -0.5898 | Time: 2.20s
Epoch [11/300] | Critic Loss: -4.8336 | Generator Loss: -1.8212 | Time: 1.20s
Epoch [21/300] | Critic Loss: -418.5198 | Generator Loss: 89.9413 | Time: 1.11s
Epoch [31/300] | Critic Loss: -17773.0257 | Generator Loss: 4861.4480 | Time: 1.74s
Epoch [41/300] | Critic Loss: -247343.7216 | Generator Loss: 76637.0178 | Time: 1.07s
Epoch [51/300] | Critic Loss: -1657627.8864 | Generator Loss: 597947.8068 | Time: 1.15s
Epoch [61/300] | Critic Loss: -9442491.1364 | Generator Loss: 3085252.8864 | Time: 1.44s
Epoch [71/300] | Critic Loss: -34292301.0909 | Generator Loss: 11778900.5455 | Time: 1.79s
Epoch [81/300] | Critic Loss: -112259998.5455 | Generator Loss: 41866527.4545 | Tim