In [29]:
import numpy as np
import pandas as pd
import torch
from ast import literal_eval

# Load and preprocess data 
data = pd.read_csv('adfa_ld_processed.csv')

# Convert string sequences to numeric arrays
data['sequence'] = data['sequence'].apply(literal_eval)

# Find max sequence length for padding
max_len = max(len(seq) for seq in data['sequence'])

# Pad sequences and convert to numpy array
def pad_sequence(seq):
    return np.pad(seq, (0, max_len - len(seq)), 'constant', constant_values=0)

X = np.array([pad_sequence([int(x) for x in seq]) for seq in data['sequence']])

# Convert labels to numeric
y = pd.get_dummies(data['label']).values

# Normalize sequences to [0,1] range
X = (X - X.min()) / (X.max() - X.min())

# Convert to PyTorch tensors
X_tensor = torch.FloatTensor(X)
y_tensor = torch.FloatTensor(y)

# Add channel dimension for CNN-based GAN
X_tensor = X_tensor.unsqueeze(1)

# Verify normalization
print(f"Min value: {X_tensor.min()}")
print(f"Max value: {X_tensor.max()}")
print(f"Shape: {X_tensor.shape}")

Min value: 0.0
Max value: 1.0
Shape: torch.Size([1579, 1, 2948])


In [30]:
X_tensor

tensor([[[0.4941, 0.4941, 0.4941,  ..., 0.0000, 0.0000, 0.0000]],

        [[0.0176, 0.0971, 0.0176,  ..., 0.0000, 0.0000, 0.0000]],

        [[0.4941, 0.7794, 0.0088,  ..., 0.0000, 0.0000, 0.0000]],

        ...,

        [[0.0176, 0.6500, 0.6500,  ..., 0.0000, 0.0000, 0.0000]],

        [[0.2676, 0.5647, 0.0176,  ..., 0.0000, 0.0000, 0.0000]],

        [[0.7059, 0.7059, 0.0971,  ..., 0.0000, 0.0000, 0.0000]]])

In [26]:
import numpy as np
import pandas as pd
import torch
from ast import literal_eval

# Load and convert sequences
data = pd.read_csv('adfa_ld_processed.csv')
data['sequence'] = data['sequence'].apply(literal_eval)

# Find max length and pad
max_len = max(len(seq) for seq in data['sequence'])

def pad_sequence(seq):
    return np.pad(seq, (0, max_len - len(seq)), 'constant', constant_values=0)

# Convert to numeric array
X = np.array([pad_sequence([int(x) for x in seq]) for seq in data['sequence']])

# Normalize to [0,1]
X = (X - X.min()) / (X.max() - X.min())

# Convert labels 
y = pd.get_dummies(data['label']).values

# Convert to tensors
X_tensor = torch.FloatTensor(X)
y_tensor = torch.FloatTensor(y)

# Add channel dimension for CNN-GAN
X_tensor = X_tensor.unsqueeze(1)

print(f"Data shape: {X_tensor.shape}")
print(f"Value range: [{X_tensor.min():.3f}, {X_tensor.max():.3f}]")

Data shape: torch.Size([1579, 1, 2948])
Value range: [0.000, 1.000]


In [27]:
import torch
import torch.nn as nn
import torch.utils.data as data

# Reshape tensor - flatten channel dimension
X_tensor = X_tensor.squeeze(1)  # New shape: [1579, 2948]

# Constants
input_dim = 2948
latent_dim = 100
batch_size = 64
hidden_dim = 512

# Generator
class Generator(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(latent_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.LeakyReLU(0.2),
            nn.Linear(hidden_dim, hidden_dim * 2),
            nn.BatchNorm1d(hidden_dim * 2),
            nn.LeakyReLU(0.2),
            nn.Linear(hidden_dim * 2, input_dim),
            nn.Sigmoid()
        )
    
    def forward(self, z):
        return self.model(z)

# Discriminator
class Discriminator(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, hidden_dim * 2),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim * 2, hidden_dim),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim, 1),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        return self.model(x)

# Setup training
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
generator = Generator().to(device)
discriminator = Discriminator().to(device)

# Create dataset of malicious samples only
malicious_mask = y_tensor[:, 1] == 1
malicious_data = X_tensor[malicious_mask]
dataset = data.TensorDataset(malicious_data)
dataloader = data.DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Optimizers
g_optimizer = torch.optim.Adam(generator.parameters(), lr=0.0002, betas=(0.5, 0.999))
d_optimizer = torch.optim.Adam(discriminator.parameters(), lr=0.0002, betas=(0.5, 0.999))
criterion = nn.BCELoss()

# Initialize models
latent_dim = 100
seq_length = X_tensor.shape[1]
# Setup training
adversarial_loss = nn.BCELoss()
optimizer_G = torch.optim.Adam(generator.parameters(), lr=0.0002)
optimizer_D = torch.optim.Adam(discriminator.parameters(), lr=0.0002)

# Training loop
num_epochs = 10000
batch_size = 32

try:
    for epoch in range(num_epochs):
        generator.train()
        discriminator.train()
        
        total_d_loss = 0
        total_g_loss = 0
        batches = 0
        
        for i, (real_samples,) in enumerate(dataloader):
            current_batch_size = real_samples.size(0)
            
            # Train Discriminator
            optimizer_D.zero_grad()
            real_samples = real_samples.to(device)
            real_labels = torch.ones(current_batch_size, 1).to(device)
            fake_labels = torch.zeros(current_batch_size, 1).to(device)
            
            z = torch.randn(current_batch_size, latent_dim).to(device)
            fake_samples = generator(z)
            
            d_real = discriminator(real_samples)
            d_fake = discriminator(fake_samples.detach())
            d_loss = (adversarial_loss(d_real, real_labels) + 
                     adversarial_loss(d_fake, fake_labels)) / 2
            
            d_loss.backward()
            optimizer_D.step()

            # Train Generator
            optimizer_G.zero_grad()
            g_output = discriminator(fake_samples)
            g_loss = adversarial_loss(g_output, real_labels)
            g_loss.backward()
            optimizer_G.step()

            total_d_loss += d_loss.item()
            total_g_loss += g_loss.item()
            batches += 1

        # Generate samples
        if epoch % 10 == 0:
            generator.eval()  # Set to eval mode
            with torch.no_grad():
                z = torch.randn(batch_size, latent_dim, device=device)  # Use full batch
                fake = generator(z)
                print(f"\nEpoch {epoch} Summary:")
                print(f"Average D loss: {total_d_loss/batches:.4f}")
                print(f"Average G loss: {total_g_loss/batches:.4f}")
                print(f"Sample output: {fake[0][:10].cpu().numpy()}\n")
            generator.train()  # Back to train mode

except Exception as e:
    print(f"Error during training: {str(e)}")
    import traceback
    traceback.print_exc()


Epoch 0 Summary:
Average D loss: 0.3614
Average G loss: 3.5739
Sample output: [0.62108827 0.4661329  0.3779124  0.53988224 0.51353925 0.59681416
 0.48363647 0.5659444  0.55977416 0.5640016 ]


Epoch 10 Summary:
Average D loss: 0.0181
Average G loss: 7.3737
Sample output: [0.9052442  0.89422715 0.9676059  0.9875892  0.74723434 0.9737951
 0.6233567  0.8873492  0.88111496 0.95715576]



KeyboardInterrupt: 