<a href="https://colab.research.google.com/github/Dhaneshkp/DesktopAssistant/blob/main/GAN%20with%20tf.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression

# Example data generation function
def generate_data(num_samples=1000):
    np.random.seed(0)
    age = np.random.randint(20, 60, size=num_samples)
    duration = np.random.randint(1, 20, size=num_samples)
    risk_class = np.random.randint(0, 3, size=num_samples)
    client_plan = np.random.randint(0, 4, size=num_samples)
    premium_rate = age * 0.1 + duration * 0.05 + risk_class * 1.5 + client_plan * 0.2 + np.random.normal(0, 1, size=num_samples)

    data = pd.DataFrame({
        'age': age,
        'duration': duration,
        'risk_class': risk_class,
        'client_plan': client_plan,
        'premium_rate': premium_rate
    })

    # Introduce random missing values in premium_rate
    missing_indices = np.random.choice(num_samples, size=int(0.1 * num_samples), replace=False)
    data.loc[missing_indices, 'premium_rate'] = np.nan

    return data

# Data Preparation
data = generate_data()
features = data[['age', 'duration', 'risk_class', 'client_plan']].values
premium_rates = data['premium_rate'].values

# Normalize features and target
scaler_X = StandardScaler()
scaler_y = StandardScaler()
features = scaler_X.fit_transform(features)
premium_rates = premium_rates.reshape(-1, 1)
premium_rates = scaler_y.fit_transform(premium_rates).reshape(-1)

# Create mask for missing values
mask = ~np.isnan(premium_rates)
premium_rates[~mask] = 0  # Replace NaNs with zeros for now

# Convert to tensors
features = torch.tensor(features, dtype=torch.float32)
premium_rates = torch.tensor(premium_rates, dtype=torch.float32).unsqueeze(1)
mask = torch.tensor(mask, dtype=torch.float32).unsqueeze(1)

# Train a simple linear regression model
lin_reg = LinearRegression()
lin_reg.fit(features[mask.squeeze().bool()], premium_rates[mask.squeeze().bool()])
lin_reg_predictions = lin_reg.predict(features)

class Generator(nn.Module):
    def __init__(self, feature_dim, target_dim):
        super(Generator, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(feature_dim + target_dim, 128),
            nn.ReLU(),
            nn.Linear(128, target_dim),
            nn.ReLU()  # Ensure non-negative outputs
        )

    def forward(self, x, noise):
        input_combined = torch.cat((x, noise), dim=1)
        return self.fc(input_combined)

class Discriminator(nn.Module):
    def __init__(self, feature_dim, target_dim):
        super(Discriminator, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(feature_dim + target_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )

    def forward(self, x, y):
        input_combined = torch.cat((x, y), dim=1)
        return self.fc(input_combined)

# Initialize models
feature_dim = features.shape[1]
target_dim = 1  # Premium rate is a single value
generator = Generator(feature_dim, target_dim)
discriminator = Discriminator(feature_dim, target_dim)

# Optimizers
g_optimizer = optim.Adam(generator.parameters(), lr=0.001, betas=(0.5, 0.999))
d_optimizer = optim.Adam(discriminator.parameters(), lr=0.001, betas=(0.5, 0.999))

# Loss functions
reconstruction_loss = nn.MSELoss()
regression_loss = nn.MSELoss()

# Gradient penalty
def compute_gradient_penalty(discriminator, real_samples, fake_samples, real_features):
    alpha = torch.rand(real_samples.size(0), 1)
    alpha = alpha.expand(real_samples.size()).to(real_samples.device)

    interpolates = (alpha * real_samples + ((1 - alpha) * fake_samples)).requires_grad_(True)
    d_interpolates = discriminator(real_features, interpolates)

    gradients = torch.autograd.grad(
        outputs=d_interpolates,
        inputs=interpolates,
        grad_outputs=torch.ones(d_interpolates.size()).to(real_samples.device),
        create_graph=True,
        retain_graph=True,
        only_inputs=True
    )[0]

    gradients = gradients.view(gradients.size(0), -1)
    gradient_penalty = ((gradients.norm(2, dim=1) - 1) ** 2).mean()
    return gradient_penalty

# Training loop
num_epochs = 1000
batch_size = 64
lambda_gp = 10  # Gradient penalty coefficient
critic_iterations = 5  # Number of discriminator updates per generator update

for epoch in range(num_epochs):
    for _ in range(critic_iterations):
        # Generate fake data
        noise = torch.randn(batch_size, target_dim)

        # Create batches (replace with your data loading logic)
        batch_indices = np.random.choice(len(features), batch_size, replace=False)
        real_features = features[batch_indices]
        real_premium_rates = premium_rates[batch_indices]
        batch_mask = mask[batch_indices]
        lin_reg_preds = torch.tensor(lin_reg_predictions[batch_indices], dtype=torch.float32).unsqueeze(1)

        # Generate fake targets using the generator
        fake_premium_rates = generator(real_features, noise)

        # Combine real and fake targets using the mask
        combined_premium_rates = batch_mask * real_premium_rates + (1 - batch_mask) * fake_premium_rates

        # Train Discriminator
        d_optimizer.zero_grad()

        real_validity = discriminator(real_features, real_premium_rates)
        fake_validity = discriminator(real_features, combined_premium_rates.detach())

        gradient_penalty = compute_gradient_penalty(discriminator, real_premium_rates, combined_premium_rates, real_features)
        d_loss = -torch.mean(real_validity) + torch.mean(fake_validity) + lambda_gp * gradient_penalty

        d_loss.backward()
        d_optimizer.step()

    # Train Generator
    g_optimizer.zero_grad()

    noise = torch.randn(batch_size, target_dim)
    fake_premium_rates = generator(real_features, noise)
    combined_premium_rates = batch_mask * real_premium_rates + (1 - batch_mask) * fake_premium_rates

    g_loss_adv = -torch.mean(discriminator(real_features, combined_premium_rates))
    g_loss_recon = reconstruction_loss(combined_premium_rates, real_premium_rates)
    g_loss_reg = regression_loss(combined_premium_rates, lin_reg_preds)
    g_loss = g_loss_adv + g_loss_recon + g_loss_reg

    g_loss.backward()
    g_optimizer.step()

    if epoch % 100 == 0:
        print(f'Epoch [{epoch}/{num_epochs}], d_loss: {d_loss.item()}, g_loss: {g_loss.item()}')

# Use the trained generator for imputation
noise = torch.randn(len(features), target_dim)
imputed_premium_rates = generator(features, noise).detach().numpy()
imputed_premium_rates = mask.numpy() * premium_rates.numpy() + (1 - mask.numpy()) * imputed_premium_rates

# Denormalize the imputed values
imputed_premium_rates = scaler_y.inverse_transform(imputed_premium_rates)

# Replace the original missing values with the imputed values
data['imputed_remium_rate'] = imputed_premium_rates
print(data)


  return F.mse_loss(input, target, reduction=self.reduction)


Epoch [0/1000], d_loss: 7.523110389709473, g_loss: 1.493195652961731
Epoch [100/1000], d_loss: 0.03502834215760231, g_loss: 1.0470221042633057
Epoch [200/1000], d_loss: 0.030421800911426544, g_loss: 1.1214067935943604
Epoch [300/1000], d_loss: 0.019457165151834488, g_loss: 1.5163860321044922
Epoch [400/1000], d_loss: 0.018131723627448082, g_loss: 1.1728957891464233
Epoch [500/1000], d_loss: 0.017252637073397636, g_loss: 1.4371570348739624
Epoch [600/1000], d_loss: 0.013920199126005173, g_loss: 1.1134288311004639
Epoch [700/1000], d_loss: 0.013138755224645138, g_loss: 1.176929235458374
Epoch [800/1000], d_loss: 0.010633702389895916, g_loss: 1.4720985889434814
Epoch [900/1000], d_loss: 0.013439767062664032, g_loss: 1.284769058227539
     age  duration  risk_class  client_plan  premium_rate  imputed_remium_rate
0     20         5           2            0      6.120132             6.120131
1     23         2           2            2      6.453851             6.453850
2     23        18    