# Airfoil VAE Training with Cosine Noise

This notebook handles the complete training pipeline for the Airfoil VAE model with cosine noise generation.

## 1. Setup and Imports

In [None]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from scipy.signal import savgol_filter
import matplotlib.pyplot as plt

# Verify GPU availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Setup directory (for Colab)
if 'COLAB_GPU' in os.environ:
    from google.colab import drive
    drive.mount('/content/drive')
    os.chdir('/content/drive/MyDrive/Airfoil2/m1_airfoilvae/ActFun_relu')  # Update with your path
    print("Current directory:", os.getcwd())
    !ls

## 2. Cosine Noise Generation

In [None]:
def generate_cosine_noise(num_samples, length, amplitude=1e-5, frequency=10):
    """Generate structured cosine noise for airfoil data"""
    x = np.linspace(0, 2*np.pi, length)
    noise = np.zeros((num_samples, length))
    for i in range(num_samples):
        phase_shift = np.random.uniform(0, 2*np.pi)
        freq_variation = np.random.uniform(0.8, 1.2) * frequency
        noise[i] = amplitude * np.cos(freq_variation * x + phase_shift)
    return noise

## 3. Data Loading and Preparation

In [None]:
# Verify data file exists
assert os.path.exists('airfoils.dat'), "airfoils.dat not found in current directory"

# Load and prepare data with noise
data = np.loadtxt('airfoils.dat')
data = data[:, 1:]  # Remove first column if needed

# Generate and add noise
noise = generate_cosine_noise(data.shape[0], data.shape[1], amplitude=1e-5)
data_with_noise = data + noise

# Normalize data
data_min = np.min(data_with_noise, axis=0)
data_max = np.max(data_with_noise, axis=0)
data_norm = (data_with_noise - data_min) / (data_max - data_min)
data_tensor = torch.tensor(data_norm, dtype=torch.float32).to(device)

print(f"Data loaded successfully. Shape: {data_tensor.shape}")

## 4. VAE Model Definition

In [None]:
class VAE(nn.Module):
    def __init__(self, input_dim, hidden_sizes, latent_dim, activation_function='relu'):
        super(VAE, self).__init__()
        self.activation_function = activation_function

        # Encoder
        self.encoders = nn.ModuleList()
        in_dim = input_dim
        for h_dim in hidden_sizes:
            self.encoders.append(nn.Linear(in_dim, h_dim))
            in_dim = h_dim

        self.fc_mu = nn.Linear(hidden_sizes[-1], latent_dim)
        self.fc_logvar = nn.Linear(hidden_sizes[-1], latent_dim)

        # Decoder
        self.decoders = nn.ModuleList()
        in_dim = latent_dim
        for h_dim in reversed(hidden_sizes):
            self.decoders.append(nn.Linear(in_dim, h_dim))
            in_dim = h_dim

        self.fc_out = nn.Linear(hidden_sizes[0], input_dim)

    def forward(self, x):
        mu, logvar = self.encoder(x)
        z = self.reparameterize(mu, logvar)
        return self.decoder(z), mu, logvar
    
    def encoder(self, x):
        for layer in self.encoders:
            x = layer(x)
            x = self._apply_activation(x)
        return self.fc_mu(x), self.fc_logvar(x)
    
    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std
    
    def decoder(self, z):
        for layer in self.decoders:
            z = layer(z)
            z = self._apply_activation(z)
        return self.fc_out(z)
    
    def _apply_activation(self, x):
        if self.activation_function == 'relu':
            return F.relu(x)
        elif self.activation_function == 'tanh':
            return torch.tanh(x)
        elif self.activation_function == 'leaky_relu':
            return F.leaky_relu(x)
        elif self.activation_function == 'sigmoid':
            return torch.sigmoid(x)
        else:
            raise ValueError(f"Unsupported activation: {self.activation_function}")

## 5. Training Pipeline

In [None]:
def train_vae(data_tensor, input_dim, device, params):
    """Complete training procedure"""
    model = VAE(
        input_dim=input_dim,
        hidden_sizes=params['hidden_sizes'],
        latent_dim=params['latent_dim'],
        activation_function=params['activation']
    ).to(device)
    
    optimizer = optim.Adam(model.parameters(), lr=params['lr'])
    
    train_losses = []
    model.train()
    
    for epoch in range(params['epochs']):
        optimizer.zero_grad()
        recon, mu, logvar = model(data_tensor)
        
        # Reconstruction + KL divergence losses
        mse_loss = F.mse_loss(recon, data_tensor, reduction='sum')
        kld_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
        loss = mse_loss + kld_loss
        
        loss.backward()
        optimizer.step()
        
        train_losses.append(loss.item())
        
        if (epoch + 1) % 500 == 0:
            print(f"Epoch {epoch+1}/{params['epochs']}, Loss: {loss.item():.4f}")
    
    return model, train_losses

## 6. Main Execution

In [None]:
# Configuration
params = {
    'hidden_sizes': [200, 150, 100],
    'latent_dim': 8,
    'activation': 'relu',
    'lr': 0.001,
    'epochs': 5
}

# Train the model
trained_model, losses = train_vae(
    data_tensor=data_tensor,
    input_dim=data_tensor.shape[1],
    device=device,
    params=params
)

# Save the trained model
torch.save(trained_model.state_dict(), 'best_vae.pth')
print("Training completed and model saved!")

## 7. Visualization (Optional)

In [None]:
# Plot training loss
plt.figure(figsize=(10, 5))
plt.plot(losses)
plt.title('Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.grid(True)
plt.show()