In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import pandas as pd
import numpy as np

In [2]:
# Generator model definition
class Generator(nn.Module):
    def __init__(self, latent_dim, input_dim):
        super(Generator, self).__init__()
        
        self.model = nn.Sequential(
            nn.Linear(latent_dim, 128),
            nn.ReLU(True),
            nn.BatchNorm1d(128, momentum=0.8),
            nn.Linear(128, 256),
            nn.ReLU(True),
            nn.BatchNorm1d(256, momentum=0.8),
            nn.Linear(256, input_dim),
            nn.Tanh()
        )

    def forward(self, noise):
        data = self.model(noise)
        return data

In [3]:
class Discriminator(nn.Module):
    def __init__(self, input_dim):
        super(Discriminator, self).__init__()
        
        self.model = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(128, 64),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(64, 1),
            nn.Sigmoid()
        )

    def forward(self, data):
        validity = self.model(data)
        return validity



In [4]:
class ACGAN():
    def __init__(self, input_dim, latent_dim):
        self.input_dim = input_dim
        self.latent_dim = latent_dim

        self.generator = Generator(latent_dim, input_dim)
        self.discriminator = Discriminator(input_dim)

        self.generator = self.generator.to('cpu')
        self.discriminator = self.discriminator.to('cpu')

        self.adversarial_loss = nn.BCELoss()

        self.optimizer_G = optim.Adam(self.generator.parameters(), lr=0.0002, betas=(0.5, 0.999))
        self.optimizer_D = optim.Adam(self.discriminator.parameters(), lr=0.0002, betas=(0.5, 0.999))

    def train(self, X_train, epochs, batch_size, sample_interval):
        torch.autograd.set_detect_anomaly(True)

        train_dataset = TensorDataset(torch.tensor(X_train).float())
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

        for epoch in range(epochs):
            train_g_loss, train_d_loss = 0, 0

            self.generator.train()
            self.discriminator.train()

            for i, (real_data,) in enumerate(train_loader):
                real_data = real_data.to('cpu')
                batch_size = real_data.size(0)
                valid = torch.ones(batch_size, 1, device='cpu')
                fake = torch.zeros(batch_size, 1, device='cpu')

                # Train Generator
                self.optimizer_G.zero_grad()
                z = torch.randn(batch_size, self.latent_dim, device='cpu')
                gen_data = self.generator(z)

                validity = self.discriminator(gen_data)
                g_loss = self.adversarial_loss(validity, valid)
                g_loss.backward()
                self.optimizer_G.step()

                # Train Discriminator
                self.optimizer_D.zero_grad()
                validity_real = self.discriminator(real_data)
                d_real_loss = self.adversarial_loss(validity_real, valid)

                validity_fake = self.discriminator(gen_data.detach())
                d_fake_loss = self.adversarial_loss(validity_fake, fake)

                d_loss = (d_real_loss + d_fake_loss) / 2
                d_loss.backward()
                self.optimizer_D.step()

                train_d_loss += d_loss.item()
                train_g_loss += g_loss.item()

            train_d_loss /= len(train_loader)
            train_g_loss /= len(train_loader)

            print(f"Epoch {epoch+1}/{epochs} | D Loss: {train_d_loss:.4f} | G Loss: {train_g_loss:.4f}")

    def generate_data(self, num_samples):
        self.generator.eval()
        with torch.no_grad():
            z = torch.randn(num_samples, self.latent_dim, device='cpu')
            generated_data = self.generator(z)
        return generated_data.cpu().numpy()

In [None]:
def main_worker(data_path):
    # Load and preprocess the dataset
    df = pd.read_excel(data_path)

    # Remove the first two non-numeric columns
    df = df.iloc[:, 2:]  # Assuming the first two columns are non-numeric and need to be removed

    # Ensure all remaining columns are numeric, coerce non-numeric data to NaN
    df = df.apply(pd.to_numeric, errors='coerce')

    # Handle missing values by filling NaN with the mean of each column
    df.fillna(df.mean(), inplace=True)

    X = df.drop(columns=['BankRupt']).values
    y = df['BankRupt'].values

    # Normalize data
    scaler = StandardScaler()
    X = scaler.fit_transform(X)

    # Parameters
    input_dim = X.shape[1]  # Number of features in your dataset
    latent_dim = 100

    # Initialize ACGAN
    acgan = ACGAN(input_dim=input_dim, latent_dim=latent_dim)

    # Train the model
    acgan.train(X_train=X, epochs=500, batch_size=32, sample_interval=200)

    # Set the number of minority samples to generate (fixed to 10,000)
    num_minority_samples = 10000

    # Generate synthetic data
    generated_data = acgan.generate_data(num_samples=num_minority_samples)
    print(f"Generated Data Shape: {generated_data.shape}")

    # Save the generated data to Excel
    generated_data_df = pd.DataFrame(generated_data, columns=[f"Feature_{i+1}" for i in range(generated_data.shape[1])])
    generated_data_df.to_excel("generated_minority_data.xlsx", index=False)
    print("Generated minority data saved to 'generated_minority_data.xlsx'")