In [None]:
import os
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from scipy.io import loadmat
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error, mean_squared_error, r2_score
import xgboost as xgb
import matplotlib.pyplot as plt
import seaborn as sns

# Set device
device = "cuda" if torch.cuda.is_available() else "cpu"

# Load all wind direction files dynamically
data_files = ["../"]  # Add all files
results = []

# GAN Architecture for Generating Synthetic Data
class Generator(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(Generator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, output_dim)
        )

    def forward(self, x):
        return self.model(x)

class Discriminator(nn.Module):
    def __init__(self, input_dim):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.model(x)

# PIML Model
class PIMLRegressionModel(nn.Module):
    def __init__(self, input_size):
        super(PIMLRegressionModel, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# Process each file
for file in data_files:
    mat_data = loadmat(file)

    # Extract relevant fields
    try:
        roof_pitch = mat_data['Roof_pitch'].flatten()
        sample_frequency = mat_data['Sample_frequency'].flatten()
        building_depth = mat_data['Building_depth'].flatten()
        building_breadth = mat_data['Building_breadth'].flatten()
        building_height = mat_data['Building_height'].flatten()
        wind_azimuth = mat_data['Wind_azimuth'].flatten()
        wind_pressure_coefficients = mat_data['Wind_pressure_coefficients']
    except KeyError:
        print(f"File {file} is missing some keys. Skipping.")
        continue

    mean_pressure_coefficients = wind_pressure_coefficients.mean(axis=1)
    num_samples = mean_pressure_coefficients.shape[0]

    features = pd.DataFrame({
        "Roof_pitch": np.tile(roof_pitch, num_samples // len(roof_pitch)),
        "Sample_frequency": np.tile(sample_frequency, num_samples // len(sample_frequency)),
        "Building_depth": np.tile(building_depth, num_samples // len(building_depth)),
        "Building_breadth": np.tile(building_breadth, num_samples // len(building_breadth)),
        "Building_height": np.tile(building_height, num_samples // len(building_height)),
        "Wind_azimuth": np.tile(wind_azimuth, num_samples // len(wind_azimuth)),
    })
    features["Mean_pressure_coefficient"] = mean_pressure_coefficients

    # Polynomial Features
    poly = PolynomialFeatures(degree=2, include_bias=False)
    poly_features = poly.fit_transform(features.drop(columns=["Mean_pressure_coefficient"]))
    feature_names = poly.get_feature_names_out(features.drop(columns=["Mean_pressure_coefficient"]).columns)
    poly_df = pd.DataFrame(poly_features, columns=feature_names)
    poly_df["Mean_pressure_coefficient"] = mean_pressure_coefficients

    # Split data
    X = poly_df.drop(columns=["Mean_pressure_coefficient"])
    y = poly_df["Mean_pressure_coefficient"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Scale features
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    # --- Train GAN ---
    latent_dim = 10
    generator = Generator(latent_dim, X_train_scaled.shape[1]).to(device)
    discriminator = Discriminator(X_train_scaled.shape[1]).to(device)

    criterion = nn.BCELoss()
    optimizer_G = torch.optim.Adam(generator.parameters(), lr=0.0002)
    optimizer_D = torch.optim.Adam(discriminator.parameters(), lr=0.0002)

    real_label = torch.ones(X_train_scaled.shape[0], 1).to(device)
    fake_label = torch.zeros(X_train_scaled.shape[0], 1).to(device)

    for epoch in range(500):  # GAN Training Loop
        # Train Discriminator
        discriminator.zero_grad()
        real_data = torch.tensor(X_train_scaled, dtype=torch.float32).to(device)
        output_real = discriminator(real_data)
        loss_real = criterion(output_real, real_label)

        noise = torch.randn(X_train_scaled.shape[0], latent_dim).to(device)
        fake_data = generator(noise)
        output_fake = discriminator(fake_data.detach())
        loss_fake = criterion(output_fake, fake_label)

        loss_D = loss_real + loss_fake
        loss_D.backward()
        optimizer_D.step()

        # Train Generator
        generator.zero_grad()
        output_fake = discriminator(fake_data)
        loss_G = criterion(output_fake, real_label)
        loss_G.backward()
        optimizer_G.step()

    # Generate synthetic data
    noise = torch.randn(1000, latent_dim).to(device)
    synthetic_data = generator(noise).cpu().detach().numpy()

    # --- Train XGBoost ---
    xgb_model = xgb.XGBRegressor(objective='reg:squarederror', random_state=42)
    xgb_model.fit(X_train_scaled, y_train)

    y_pred_xgb = xgb_model.predict(X_test_scaled)
    mape_xgb = mean_absolute_percentage_error(y_test, y_pred_xgb)
    r2_xgb = r2_score(y_test, y_pred_xgb)

    # --- Train PIML ---
    piml_model = PIMLRegressionModel(X_train_scaled.shape[1]).to(device)
    optimizer = torch.optim.Adam(piml_model.parameters(), lr=0.0001)
    criterion = nn.MSELoss()

    train_dataset = torch.utils.data.TensorDataset(
        torch.tensor(X_train_scaled, dtype=torch.float32).to(device),
        torch.tensor(y_train.values, dtype=torch.float32).to(device)
    )
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

    for epoch in range(100):  # PIML Training Loop
        piml_model.train()
        for features, targets in train_loader:
            optimizer.zero_grad()
            outputs = piml_model(features).squeeze()
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

    # PIML Prediction
    X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32).to(device)
    y_pred_piml = piml_model(X_test_tensor).cpu().detach().numpy()

    mape_piml = mean_absolute_percentage_error(y_test, y_pred_piml)
    r2_piml = r2_score(y_test, y_pred_piml)

    # Save results
    results.append({
        "File": file,
        "XGB_MAPE": mape_xgb,
        "XGB_R²": r2_xgb,
        "PIML_MAPE": mape_piml,
        "PIML_R²": r2_piml
    })

# Display results
results_df = pd.DataFrame(results)
print(results_df)
