In [3]:
import torch
import torch.nn as nn
from torch.optim import Adam
import pandas as pd
import numpy as np
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler

# Load data
# url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv'
data = pd.read_csv('Month_Value_1.csv')

# Standardize the data (important for GANs)
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data)
num_features = scaled_data.shape[1]

# Retain column names
column_names = data.columns

class MyDataset(Dataset):
    def __init__(self, data):
        self.data = torch.tensor(data, dtype=torch.float32)
    
    def __getitem__(self, index):
        return self.data[index]
    
    def __len__(self):
        return len(self.data)

dataset = MyDataset(scaled_data)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)


# Define the generator loss
def generator_loss(D_G_z):
    return -torch.mean(D_G_z)

# Define the discriminator loss
def discriminator_loss(D_x, D_G_z):
    return -(torch.mean(D_x) - torch.mean(D_G_z))

# hyperparameters
lr = 0.0001
betas = (0.5, 0.9)
batch_size = 64
n_epochs = 50
threshold = 0.5
lambda_reg = 0.1

class AugmentedFilterLayer(nn.Module):
    def __init__(self, threshold):
        super(AugmentedFilterLayer, self).__init__()
        self.threshold = threshold

    def forward(self, x, Gz):
        distance = torch.sqrt(torch.sum((x - Gz)**2, dim=1))
        return torch.where(distance.unsqueeze(-1) < self.threshold, Gz, torch.zeros_like(Gz))


# Define the generator
class Generator(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(Generator, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 128), 
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(128, 256), 
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(256, 512),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(512, output_dim),
            nn.Tanh(),  
        )
       
    def forward(self, x):
        return self.net(x)
    
    
# Define the discriminator
class Discriminator(nn.Module):
    def __init__(self, input_dim):
        super(Discriminator, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 512),
            nn.LeakyReLU(0.2),
            nn.Linear(512, 256),
            nn.LeakyReLU(0.2),
            nn.Linear(256, 128),
            nn.LeakyReLU(0.2),
            nn.Linear(128, 1)
        )
        
    def forward(self, x):
        return self.net(x)


# Create the Generator and the Discriminator
input_dim = 100
output_dim = num_features
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
generator = Generator(input_dim, output_dim).to(device)
discriminator = Discriminator(output_dim).to(device)

# optimizers
optimizerG = torch.optim.Adam(generator.parameters(), lr=lr, betas=betas)
optimizerD = torch.optim.Adam(discriminator.parameters(), lr=lr, betas=betas)

# Augmented Filter Layer
augmented_filter_layer = AugmentedFilterLayer(threshold)

# Define the TRH distance calculation function
def calculate_TRH_distance(XT, YT, lambda_reg):
    # Step 1: Generate the matrix D_lambda
    k, m = XT.size(0), YT.size(0)
    D_lambda = torch.empty(k, m)
    for i in range(k):
        for j in range(m):
            d = torch.norm(XT[i] - YT[j]) + lambda_reg * abs(i - j)
            D_lambda[i, j] = d

    # Step 2: Calculate dx and dy
    dx = torch.min(D_lambda, dim=1)[0]
    dy = torch.min(D_lambda, dim=0)[0]

    # Step 3: Calculate dx_max and dy_max
    dx_max = torch.max(dx)
    dy_max = torch.max(dy)

    # Step 4: Calculate d_trh
    d_trh = max(dx_max, dy_max)

    return d_trh


# Training loop
for epoch in range(n_epochs):
    for i, data in enumerate(dataloader):
        real_data = data.to(device)
        
        # ====== Train Discriminator ====== #
        optimizerD.zero_grad()

        # Generate fake data and detach (so gradients are not calculated for generator)
        noise = torch.randn(batch_size, input_dim, device=device)
        fake_data = generator(noise)
        fake_data = augmented_filter_layer(real_data, fake_data.detach())
        
        # Calculate TRH distance
        trh_distance = calculate_TRH_distance(real_data, fake_data, lambda_reg)
        
        # Forward pass real and fake batches through Discriminator
        D_x = discriminator(real_data)
        D_G_z = discriminator(fake_data)
        
        # Calculate Discriminator loss
        d_loss = discriminator_loss(D_x, D_G_z)
        d_loss.backward()
        optimizerD.step()
        
        # ====== Train Generator ====== #
        optimizerG.zero_grad()
        
        # Generate fake data
        fake_data = generator(noise)
        fake_data = augmented_filter_layer(real_data, fake_data)

        # Calculate TRH distance
        trh_distance = calculate_TRH_distance(real_data, fake_data, lambda_reg)

        # Forward pass fake batch through Discriminator
        D_G_z = discriminator(fake_data)
        
        # Calculate Generator loss
        g_loss = generator_loss(D_G_z)
        g_loss.backward()
        optimizerG.step()

# Generate some data
z = torch.randn((batch_size, input_dim)).to(device)
generated_data = generator(z).detach().cpu().numpy()

# Apply inverse transform to recover original scale
generated_data = scaler.inverse_transform(generated_data)


from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression

# Assume 'y' is your target variable
y = data['The_average_annual_payroll_of_the_region']

# Split the real data
X_train_real, X_test_real, y_train_real, y_test_real = train_test_split(data, y, test_size=0.2, random_state=42)

# Split the generated data
X_train_gen, X_test_gen, y_train_gen, y_test_gen = train_test_split(generated_data, y, test_size=0.2, random_state=42)

# Weighted Logistic Regression Model for real data
model_real = LogisticRegression(class_weight='balanced')
model_real.fit(X_train_real, y_train_real)

# Weighted Logistic Regression Model for generated data
model_gen = LogisticRegression(class_weight='balanced')
model_gen.fit(X_train_gen, y_train_gen)

# Evaluate the models
score_real = model_real.score(X_test_real, y_test_real)
score_gen = model_gen.score(X_test_gen, y_test_gen)

print("Real data score:", score_real)
print("Generated data score:", score_gen)


TypeError: new(): invalid data type 'str'