<a href="https://colab.research.google.com/github/Amirgh8080/Anlyzer/blob/main/OPT_Paper.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Configuration

In [None]:
! pip install optuna
! pip install botorch

Collecting botorch
  Downloading botorch-0.13.0-py3-none-any.whl.metadata (10 kB)
Collecting pyre_extensions (from botorch)
  Downloading pyre_extensions-0.0.32-py3-none-any.whl.metadata (4.0 kB)
Collecting gpytorch==1.14 (from botorch)
  Downloading gpytorch-1.14-py3-none-any.whl.metadata (8.0 kB)
Collecting linear_operator==0.6 (from botorch)
  Downloading linear_operator-0.6-py3-none-any.whl.metadata (15 kB)
Collecting pyro-ppl>=1.8.4 (from botorch)
  Downloading pyro_ppl-1.9.1-py3-none-any.whl.metadata (7.8 kB)
Collecting jaxtyping (from gpytorch==1.14->botorch)
  Downloading jaxtyping-0.2.37-py3-none-any.whl.metadata (6.6 kB)
Collecting pyro-api>=0.1.1 (from pyro-ppl>=1.8.4->botorch)
  Downloading pyro_api-0.1.2-py3-none-any.whl.metadata (2.5 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=2.0.1->botorch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=2.0.1->b

# First Implementation

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.autograd as autograd
import numpy as np
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import optuna

# Device Configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Generator
class Generator(nn.Module):
    def __init__(self, latent_dim, img_shape):
        super(Generator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(latent_dim, 128),
            nn.LeakyReLU(0.2),
            nn.Linear(128, 256),
            nn.LeakyReLU(0.2),
            nn.Linear(256, np.prod(img_shape)),
            nn.Tanh()
        )
        self.img_shape = img_shape

    def forward(self, z):
        img = self.model(z)
        img = img.view(img.size(0), *self.img_shape)
        return img

# Discriminator with Mixup Regularization
class Discriminator(nn.Module):
    def __init__(self, img_shape):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(np.prod(img_shape), 256),
            nn.LeakyReLU(0.2),
            nn.Linear(256, 128),
            nn.LeakyReLU(0.2),
            nn.Linear(128, 1)
        )

    def forward(self, img):
        img_flat = img.view(img.size(0), -1)
        validity = self.model(img_flat)
        return validity

# Encoder for Feature Matching Loss
class Encoder(nn.Module):
    def __init__(self, img_shape, latent_dim):
        super(Encoder, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(np.prod(img_shape), 256),
            nn.LeakyReLU(0.2),
            nn.Linear(256, 128),
            nn.LeakyReLU(0.2),
            nn.Linear(128, latent_dim)
        )

    def forward(self, img):
        img_flat = img.view(img.size(0), -1)
        latent_vector = self.model(img_flat)
        return latent_vector

# Gradient Penalty
def compute_gradient_penalty(D, real_samples, fake_samples):
    alpha = torch.rand(real_samples.size(0), 1, 1, 1, device=real_samples.device)
    interpolates = (alpha * real_samples + (1 - alpha) * fake_samples).requires_grad_(True)
    d_interpolates = D(interpolates)
    fake = torch.ones(d_interpolates.size(), device=real_samples.device)
    gradients = autograd.grad(outputs=d_interpolates, inputs=interpolates,
                              grad_outputs=fake, create_graph=True, retain_graph=True, only_inputs=True)[0]
    gradient_penalty = ((gradients.norm(2, dim=1) - 1) ** 2).mean()
    return gradient_penalty

# Anomaly Score Calculation
def compute_anomaly_score(x, generator, encoder, discriminator):
    x_reconstructed = generator(encoder(x))
    reconstruction_error = ((x - x_reconstructed) ** 2).mean()
    discriminator_error = ((discriminator(x) - discriminator(x_reconstructed)) ** 2).mean()
    return reconstruction_error + discriminator_error

# Dataset and DataLoader
def get_dataloader(batch_size=32):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize([0.5], [0.5])
    ])
    dataset = datasets.MNIST(root="./data", train=True, transform=transform, download=True)
    return DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Training Setup
def train_wgangp(generator, discriminator, encoder, dataloader, latent_dim, epochs, lambda_gp=10, lr=0.0002):
    optimizer_G = optim.Adam(generator.parameters(), lr=lr, betas=(0.5, 0.9))
    optimizer_D = optim.Adam(discriminator.parameters(), lr=lr, betas=(0.5, 0.9))
    optimizer_E = optim.Adam(encoder.parameters(), lr=lr, betas=(0.5, 0.9))

    for epoch in range(epochs):
        for i, (imgs, _) in enumerate(dataloader):
            real_imgs = imgs.to(device)
            z = torch.randn(imgs.shape[0], latent_dim, device=device)
            fake_imgs = generator(z)

            optimizer_D.zero_grad()
            real_validity = discriminator(real_imgs)
            fake_validity = discriminator(fake_imgs.detach())
            gradient_penalty = compute_gradient_penalty(discriminator, real_imgs, fake_imgs)
            d_loss = fake_validity.mean() - real_validity.mean() + lambda_gp * gradient_penalty
            d_loss.backward(retain_graph=True)
            optimizer_D.step()

            if i % 5 == 0:
                optimizer_G.zero_grad()
                g_loss = -discriminator(fake_imgs).mean()
                g_loss.backward()
                optimizer_G.step()
        print(f"Epoch {epoch}/{epochs} | D Loss: {d_loss.item()} | G Loss: {g_loss.item()}")

# Running Training
generator = Generator(100, (1, 28, 28)).to(device)
discriminator = Discriminator((1, 28, 28)).to(device)
encoder = Encoder((1, 28, 28), 100).to(device)
dataloader = get_dataloader(batch_size=32)
train_wgangp(generator, discriminator, encoder, dataloader, latent_dim=100, epochs=5)


Epoch 0/5 | D Loss: -47.589019775390625 | G Loss: -367.225341796875
Epoch 1/5 | D Loss: -94.1392593383789 | G Loss: -275.6246337890625
Epoch 2/5 | D Loss: -218.41970825195312 | G Loss: -278.0072021484375
Epoch 3/5 | D Loss: -175.2352752685547 | G Loss: -386.7821960449219
Epoch 4/5 | D Loss: -271.40692138671875 | G Loss: -360.72027587890625


In [None]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, ConcatDataset
from torchvision import transforms, datasets
import torchvision.utils as vutils
import optuna
from optuna.samplers import TPESampler
from optuna.integration import BoTorchSampler  # Replacing SkoptSampler
from sklearn.metrics import roc_auc_score, precision_score, f1_score

# Ensure BoTorch is installed
try:
    import botorch
except ImportError:
    print("BoTorch not found. Install it using: pip install botorch")

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

#############################################
# 1. Model Definitions
#############################################

class View(nn.Module):
    """Helper module to reshape tensors."""
    def __init__(self, *shape):
        super(View, self).__init__()
        self.shape = shape

    def forward(self, x):
        return x.view(*self.shape)

class Generator(nn.Module):
    def __init__(self, latent_dim=100, feature_maps=64, channels=1):
        super(Generator, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(latent_dim, feature_maps * 8 * 4 * 4),
            nn.BatchNorm1d(feature_maps * 8 * 4 * 4),
            nn.ReLU(True),
            View(-1, feature_maps * 8, 4, 4),
            nn.ConvTranspose2d(feature_maps * 8, feature_maps * 4, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(feature_maps * 4),
            nn.ReLU(True),
            nn.ConvTranspose2d(feature_maps * 4, feature_maps * 2, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(feature_maps * 2),
            nn.ReLU(True),
            nn.ConvTranspose2d(feature_maps * 2, feature_maps, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(feature_maps),
            nn.ReLU(True),
            nn.ConvTranspose2d(feature_maps, channels, kernel_size=4, stride=2, padding=1),
            nn.Tanh()
        )

    def forward(self, z):
        return self.net(z)

class Discriminator(nn.Module):
    def __init__(self, feature_maps=64, channels=1):
        super(Discriminator, self).__init__()
        self.net = nn.Sequential(
            nn.Conv2d(channels, feature_maps, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(feature_maps, feature_maps * 2, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(feature_maps * 2),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(feature_maps * 2, feature_maps * 4, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(feature_maps * 4),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(feature_maps * 4, feature_maps * 8, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(feature_maps * 8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(feature_maps * 8, 1, kernel_size=4, stride=1, padding=0)
        )

    def forward(self, x):
        out = self.net(x)
        return out.view(x.size(0), -1)

#############################################
# 2. Hyperparameter Optimization with Optuna
#############################################

def objective(trial, data_dir, sampler_type="TPE"):
    """Optuna objective function for hyperparameter optimization."""
    latent_dim = trial.suggest_int("latent_dim", 64, 128)
    lr_G = trial.suggest_loguniform("lr_G", 1e-5, 1e-3)
    lr_D = trial.suggest_loguniform("lr_D", 1e-5, 1e-3)
    batch_size = trial.suggest_categorical("batch_size", [16, 32, 64])
    feature_maps = trial.suggest_categorical("feature_maps", [32, 64])

    # Create models
    generator = Generator(latent_dim=latent_dim, feature_maps=feature_maps).to(device)
    discriminator = Discriminator(feature_maps=feature_maps).to(device)

    # Create dummy training data
    train_loader = DataLoader(torch.randn(1000, 1, 64, 64), batch_size=batch_size, shuffle=True)

    # Train GAN (replace this with actual training logic)
    for epoch in range(3):  # Reduce epochs for quick testing
        for real_imgs in train_loader:
            real_imgs = real_imgs.to(device)
            z = torch.randn(batch_size, latent_dim, device=device)
            fake_imgs = generator(z)

            # Simplified loss calculations
            loss_D = discriminator(fake_imgs).mean() - discriminator(real_imgs).mean()
            loss_G = -discriminator(fake_imgs).mean()

    return loss_G.item()  # Minimize Generator loss

#############################################
# 3. Main Experiment Runner
#############################################

def run_experiment(exp_mode, data_dir):
    """Runs one of three experiments: baseline, TPE, or BO optimization."""
    if exp_mode == 'baseline':
        config = {
            "latent_dim": 100,
            "lr_G": 2e-4,
            "lr_D": 2e-4,
            "batch_size": 32,
            "feature_maps": 64,
        }
    else:
        if exp_mode == 'tpe':
            sampler = TPESampler()
        elif exp_mode == 'bo':
            sampler = BoTorchSampler()  # Replacing SkoptSampler
        else:
            raise ValueError("Unknown experiment mode")

        study = optuna.create_study(direction="minimize", sampler=sampler)
        study.optimize(lambda trial: objective(trial, data_dir), n_trials=5)
        best_params = study.best_trial.params
        print(f"Best parameters from {exp_mode.upper()}:", best_params)
        config = best_params

    # Model creation using best hyperparameters
    generator = Generator(latent_dim=config["latent_dim"], feature_maps=config["feature_maps"]).to(device)
    discriminator = Discriminator(feature_maps=config["feature_maps"]).to(device)

    # Dummy dataset for testing
    train_loader = DataLoader(torch.randn(1000, 1, 64, 64), batch_size=config["batch_size"], shuffle=True)

    # Simulated training
    for epoch in range(3):
        for real_imgs in train_loader:
            real_imgs = real_imgs.to(device)
            z = torch.randn(config["batch_size"], config["latent_dim"], device=device)
            fake_imgs = generator(z)
            loss_G = -discriminator(fake_imgs).mean()

    print(f"Experiment {exp_mode.upper()} complete. Final Generator loss: {loss_G.item():.4f}")

#############################################
# 4. Main Execution
#############################################
data_directory = "./data"  # Adjust path if needed
for mode in ['baseline', 'tpe', 'bo']:
  print("\n==============================")
  print(f"Running experiment: {mode.upper()}")
  print("==============================")
  run_experiment(mode, data_directory)


SyntaxError: incomplete input (<ipython-input-7-d3b0ee59a206>, line 46)