In [30]:
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from torch.nn import functional as F
from torch.utils.data import Dataset, random_split, DataLoader
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
import time 
from datetime import datetime, timedelta


In [7]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if torch.cuda.is_available():
    print(f"GPU Name: {torch.cuda.get_device_name(0)}")
    print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")
data = pd.read_csv('data/Physical experiments.csv',sep= ';')
num = data.select_dtypes(np.number)
cat= data.select_dtypes(exclude=np.number)
data.head()

GPU Name: NVIDIA GeForce RTX 3060 Laptop GPU
GPU Memory: 6.44 GB


Unnamed: 0,X Axis,Y Axis,Z Axis,speed,std speed,flashes,height,flash time,number of flashes,time,direction
0,0,0,0,7732100079,2127411752,836535186,893909107,375,12.0,1940939338,V
1,0,1,0,893297619,2414391967,997566682,981108211,4178899083,327.0,1999695986,U
2,0,2,0,14209136,2514055993,992084814,964873731,4241145833,576.0,2002562567,U
3,0,3,0,-7024316029,2348523655,989346321,951449147,4436641221,655.0,1995115159,U
4,0,4,0,-1405511477,2516917632,987477067,930151443,4610802139,935.0,1996782501,U


## CTGAN

In [16]:
class TabularDataset(Dataset):
    def __init__(self, data):
        self.data = torch.tensor(data,dtype=torch.float32)
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx]
        
class Conditioner(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, input_dim)  
        )
    
    def forward(self, x):
        return torch.sigmoid(self.net(x)) 

class CTGANGenerator(nn.Module):
    def __init__(self, latent_dim, output_dim, hidden_dim=256):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(latent_dim + output_dim, hidden_dim), 
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, output_dim),
            nn.Tanh()  
        )
        self.conditioner = Conditioner(output_dim, hidden_dim)
        
    def forward(self, z, c):
        condition = self.conditioner(c)
        inputs = torch.cat([z, condition], dim=1) 
        x = self.net(inputs)
        return x

class CTGANDiscriminator(nn.Module):
    def __init__(self, input_dim, hidden_dim=256):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim * 2, hidden_dim), 
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),  
            nn.Linear(hidden_dim, hidden_dim),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim, 1),
            nn.Sigmoid()
        )
        
    def forward(self, x, c):
        inputs = torch.cat([x, c], dim=1)  # Concatenate real/fake data with condition
        return self.net(inputs)

class CTGAN:
    def __init__(self, input_dim, latent_dim=100, device='cpu'):
        self.latent_dim = latent_dim
        self.device = device
        self.generator = CTGANGenerator(latent_dim, input_dim).to(device)
        self.discriminator = CTGANDiscriminator(input_dim).to(device)
        self.g_optimizer = torch.optim.Adam(self.generator.parameters(), lr=2e-4, betas=(0.5, 0.999))
        self.d_optimizer = torch.optim.Adam(self.discriminator.parameters(), lr=2e-4, betas=(0.5, 0.999))
        
    def train_step(self, real_data, conditions):
        batch_size = real_data.shape[0]
        real_data = real_data.to(self.device)
        conditions = conditions.to(self.device)
        

        self.d_optimizer.zero_grad()
        
        z = torch.randn(batch_size, self.latent_dim, device=self.device)
        fake_data = self.generator(z, conditions)
        
        real_pred = self.discriminator(real_data, conditions)
        fake_pred = self.discriminator(fake_data.detach(), conditions)
        
        d_loss_real = -torch.mean(torch.log(real_pred + 1e-8))
        d_loss_fake = -torch.mean(torch.log(1 - fake_pred + 1e-8))
        d_loss = d_loss_real + d_loss_fake
        
        d_loss.backward()
        self.d_optimizer.step()
        

        self.g_optimizer.zero_grad()
        
        z = torch.randn(batch_size, self.latent_dim, device=self.device)
        fake_data = self.generator(z, conditions)
        fake_pred = self.discriminator(fake_data, conditions)
        
        g_loss = -torch.mean(torch.log(fake_pred + 1e-8))
        
        g_loss.backward()
        self.g_optimizer.step()
        
        return d_loss.item(), g_loss.item()

    def train(self, data, conditions, epochs=200, batch_size=500):
        data = torch.FloatTensor(data)
        conditions = torch.FloatTensor(conditions)
        
        for epoch in range(epochs):
            d_losses = []
            g_losses = []
            
            idx = torch.randperm(len(data))
            data = data[idx]
            conditions = conditions[idx]
            
            for i in range(0, len(data), batch_size):
                batch_data = data[i:i+batch_size]
                batch_conditions = conditions[i:i+batch_size]
                d_loss, g_loss = self.train_step(batch_data, batch_conditions)
                d_losses.append(d_loss)
                g_losses.append(g_loss)
            
            if epoch % 10 == 0:
                print(f'Epoch {epoch}: D_loss={np.mean(d_losses):.4f}, G_loss={np.mean(g_losses):.4f}')
    
    def generate(self, num_samples, conditions):
        self.generator.eval()
        conditions = torch.FloatTensor(conditions).to(self.device)
        with torch.no_grad():
            z = torch.randn(num_samples, self.latent_dim, device=self.device)
            samples = self.generator(z, conditions)
        return samples.cpu().numpy()

## Entrainement et test

In [34]:
def format_time(seconds):
    """Convert seconds to a human-readable format"""
    return str(timedelta(seconds=int(seconds)))
    
original_data = num
n_features =  num.shape[1]
scaler = StandardScaler()
scaled_data = scaler.fit_transform(original_data)
dataset = TabularDataset(scaled_data)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

model = CTGAN(input_dim=n_features, latent_dim=8, device = device)