## Step 1: Imports & Setup

In [1]:
# Required Libraries
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
import torch.nn as nn
import torch.optim as optim
import pandas as pd
from PIL import Image
from sklearn.model_selection import train_test_split
import os

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


## Step 2: Define your dataset class

In [2]:
import os
import pandas as pd
import torch
from torch.utils.data import Dataset
from PIL import Image

class AQIDataset(Dataset):
    def __init__(self, images_dir, labels_csv, transform=None):
        self.images_dir = images_dir
        self.labels = pd.read_csv(labels_csv)
        self.transform = transform

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        row = self.labels.iloc[idx]
        img_name = row['Filename']
        img_path = os.path.join(self.images_dir, img_name)
        image = Image.open(img_path).convert('RGB')

        # Retrieve the pollutant labels
        labels = row[['VOC', 'CO', 'NO2', 'SO2', 'PM25', 'PM10', 'O3']].values.astype('float32')
        labels = torch.tensor(labels)

        if self.transform:
            image = self.transform(image)

        return image, labels   # Return the image, pollutant labels, and tabular data


## Step 3: Prepare datasets and dataloaders

In [3]:
# Data Transforms
data_transforms = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
])

# Read labels.csv
labels_df = pd.read_csv('20250528_all_timeslot_cleaned.csv')

# Split into train and validation sets
train_df, val_df = train_test_split(labels_df, test_size=0.1, random_state=42, shuffle=True)

# Save temporary csv for convenience
train_df.to_csv('train_labels.csv', index=False)
val_df.to_csv('val_labels.csv', index=False)

# Prepare Dataset objects
train_dataset = AQIDataset('output_frames_20250528/', 'train_labels.csv', transform=data_transforms)
print(f'Len of training: {len(train_dataset)}')
val_dataset = AQIDataset('output_frames_20250528/', 'val_labels.csv', transform=data_transforms)
print(f'Len of validation: {len(val_dataset)}')
# Data Loaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)


Len of training: 1912
Len of validation: 213


## Step 4: Define your Model

In [4]:

# pip install torch torchvision
import torch
import torch.nn as nn
from torchvision.models import vit_b_16, vit_l_16, ViT_B_16_Weights, ViT_L_16_Weights

# pip install torch torchvision
import torch
import torch.nn as nn
import torch.nn.functional as F

from torchvision.models import (
    # ResNet
    resnet18, resnet50, ResNet18_Weights, ResNet50_Weights,
    # ConvNeXt
    convnext_tiny, ConvNeXt_Tiny_Weights,
    # ViT
    vit_b_16, vit_l_16, ViT_B_16_Weights, ViT_L_16_Weights,
    # Swin
    swin_t, swin_b, Swin_T_Weights, Swin_B_Weights
)

_BACKBONES = {
    # ResNet
    "resnet18":  ("resnet", resnet18, ResNet18_Weights),
    "resnet50":  ("resnet", resnet50, ResNet50_Weights),
    # ConvNeXt
    "convnext_tiny": ("convnext", convnext_tiny, ConvNeXt_Tiny_Weights),
    # ViT
    "vit_b_16":  ("vit",  vit_b_16, ViT_B_16_Weights),
    "vit_l_16":  ("vit",  vit_l_16, ViT_L_16_Weights),
    # Swin
    "swin_t":    ("swin", swin_t, Swin_T_Weights),
    "swin_b":    ("swin", swin_b, Swin_B_Weights),
}

def _make_encoder(name: str, pretrained: bool = True):
    """
    Builds a backbone that returns a vector feature for an input image.
    Returns: (encoder_module, feature_dim)
    """
    if name not in _BACKBONES:
        raise ValueError(f"Unknown backbone '{name}'. "
                         f"Choose from: {list(_BACKBONES.keys())}")

    family, ctor, weights_enum = _BACKBONES[name]
    weights = weights_enum.DEFAULT if pretrained else None
    m = ctor(weights=weights)

    if family == "resnet":
        # Replace fc with Identity; the model forward then returns pooled features
        feat_dim = m.fc.in_features
        m.fc = nn.Identity()
        encoder = m                                   # (N, feat_dim)
    elif family == "convnext":
        # Use features + GAP ourselves (avoid the classifier which expects LayerNorm2d)
        # ConvNeXt-Tiny has C=768 before classifier
        feat_dim = 768
        encoder = nn.Sequential(
            m.features,                               # (N, 768, H, W)
            nn.AdaptiveAvgPool2d(1),                  # (N, 768, 1, 1)
            nn.Flatten(1)                             # (N, 768)
        )
    elif family == "vit":
        # Replace classifier heads; forward returns CLS embedding (N, embed_dim)
        feat_dim = m.heads.head.in_features
        m.heads = nn.Identity()
        encoder = m                                   # (N, feat_dim)
    elif family == "swin":
        # Replace head; forward returns pooled/normalized feature (N, embed_dim)
        feat_dim = m.head.in_features
        m.head = nn.Identity()
        encoder = m                                   # (N, feat_dim)
    else:
        raise RuntimeError("Unhandled backbone family.")

    return encoder, feat_dim

class AQINet(nn.Module):
    """
    Single-image regression with a selectable backbone:
      - ResNet:        'resnet18', 'resnet50'
      - ConvNeXt:      'convnext_tiny'
      - Vision Transformer: 'vit_b_16', 'vit_l_16'
      - Swin Transformer:   'swin_t', 'swin_b'
    The model uses a backbone feature vector and a small MLP head.
    """
    def __init__(self,
                 output_dim: int = 7,
                 backbone: str = "vit_b_16",
                 pretrained: bool = True,
                 head_hidden: int = 256,
                 dropout: float = 0.2):
        super().__init__()
        self.backbone_name = backbone
        self.encoder, feat_dim = _make_encoder(backbone, pretrained)

        self.head = nn.Sequential(
            nn.LayerNorm(feat_dim),
            nn.Linear(feat_dim, head_hidden),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(head_hidden, output_dim)  # regression: no activation
        )

    def forward(self, img):
        feats = self.encoder(img)           # (N, feat_dim)
        # Some encoders may output (N, C, 1, 1) if you swap implementations;
        # flatten defensively (safe no-op if already (N, C)).
        if feats.dim() == 4:
            feats = feats.squeeze(-1).squeeze(-1)
        return self.head(feats)

# -----------------------------



In [5]:
# Examples
# -----------------------------
# ViT base 16x16
# model = AQINet(backbone="vit_b_16").to(device)

# Swin-T
# model = AQINet(backbone="swin_t").to(device)

# ResNet-50
# model = AQINet(backbone="resnet18").to(device)

# ConvNeXt-Tiny
model = AQINet(backbone="convnext_tiny").to(device)


In [6]:
from torchinfo import summary
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Device:', device)
trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
total     = sum(p.numel() for p in model.parameters())
print(f"Trainable params: {trainable}/{total}")
summary(model.to(device))

Device: cuda
Trainable params: 28018791/28018791


Layer (type:depth-idx)                             Param #
AQINet                                             --
├─Sequential: 1-1                                  --
│    └─Sequential: 2-1                             --
│    │    └─Conv2dNormActivation: 3-1              4,896
│    │    └─Sequential: 3-2                        237,888
│    │    └─Sequential: 3-3                        74,112
│    │    └─Sequential: 3-4                        918,144
│    │    └─Sequential: 3-5                        295,680
│    │    └─Sequential: 3-6                        10,817,280
│    │    └─Sequential: 3-7                        1,181,184
│    │    └─Sequential: 3-8                        14,289,408
│    └─AdaptiveAvgPool2d: 2-2                      --
│    └─Flatten: 2-3                                --
├─Sequential: 1-2                                  --
│    └─LayerNorm: 2-4                              1,536
│    └─Linear: 2-5                                 196,864
│    └─GELU: 2-6        

## Step 5: Define Loss and Optimizer

In [7]:
criterion = nn.MSELoss()
optimizer = optim.AdamW(model.parameters(), lr=1e-4)


## Step 6: Training Loop

In [9]:
import numpy as np
import torch
from tqdm.auto import tqdm
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

num_epochs = 50
best_model_path = "best_single_image_model.pth"
# Initialize tracking variables
best_val_r2 = -np.inf
columns = ['VOC', 'CO', 'NO2', 'SO2', 'PM2.5', 'PM10', 'O3']
num_pollutants = len(columns)

# History dictionary to track metrics - added training per-pollutant R²
history = {
    'train_loss': [],
    'val_loss': [],
    'mae': [],
    'rmse': [],
    'r2': [],  # Validation R²
    'train_r2': [],  # Added: Training R²
    'per_pollutant_r2': [[] for _ in range(num_pollutants)],  # Validation
    'per_pollutant_train_r2': [[] for _ in range(num_pollutants)]  # Added: Training
}

for epoch in tqdm(range(num_epochs), desc="Training Progress"):
    # ------------------------ Training ------------------------
    model.train()
    train_loss = 0.0
    train_y_true, train_y_pred = [], []  # Added: Track training predictions

    for images, labels in train_loader:
        # Move data to device
        images, labels = images.to(device), labels.to(device)
        
        # Forward pass
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        # Backward pass and optimize
        loss.backward()
        optimizer.step()
        
        # Accumulate loss and training predictions
        train_loss += loss.item() * images.size(0)
        train_y_true.extend(labels.cpu().detach().numpy())  # Added
        train_y_pred.extend(outputs.cpu().detach().numpy())  # Added
    
    # Calculate average training loss
    train_loss /= len(train_loader.dataset)
    history['train_loss'].append(train_loss)

    # Calculate training metrics (added)
    train_y_true = np.array(train_y_true)
    train_y_pred = np.array(train_y_pred)
    train_r2 = r2_score(train_y_true, train_y_pred)
    history['train_r2'].append(train_r2)

    # ------------------------ Validation ------------------------
    model.eval()
    val_loss = 0.0
    y_true, y_pred = [], []

    with torch.no_grad():  # Disable gradient computation
        for images, labels in val_loader:
            # Move data to device
            images, labels = images.to(device), labels.to(device)
            
            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            # Accumulate loss and predictions
            val_loss += loss.item() * images.size(0)
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(outputs.cpu().numpy())

    # Calculate average validation loss
    val_loss /= len(val_loader.dataset)
    history['val_loss'].append(val_loss)

    # Convert to NumPy arrays for metric calculation
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)

    # Calculate validation metrics
    mae = mean_absolute_error(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    r2 = r2_score(y_true, y_pred)
    
    # Store metrics
    history['mae'].append(mae)
    history['rmse'].append(rmse)
    history['r2'].append(r2)

    # Print epoch summary
    print(f"\nEpoch [{epoch+1}/{num_epochs}]")
    print(f"Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")
    print(f"Train R²: {train_r2:.4f} | Val R²: {r2:.4f}")  # Added training R²
    print(f"MAE: {mae:.4f} | RMSE: {rmse:.4f}")

    # Per-pollutant R² scores - Training (added)
    print("\nTraining Per-pollutant R² Scores:")
    for i, name in enumerate(columns):
        train_pollutant_r2 = r2_score(train_y_true[:, i], train_y_pred[:, i])
        history['per_pollutant_train_r2'][i].append(train_pollutant_r2)
        print(f"  {name}: {train_pollutant_r2:.4f}")

    # Per-pollutant R² scores - Validation
    print("\nValidation Per-pollutant R² Scores:")
    for i, name in enumerate(columns):
        pollutant_r2 = r2_score(y_true[:, i], y_pred[:, i])
        history['per_pollutant_r2'][i].append(pollutant_r2)
        print(f"  {name}: {pollutant_r2:.4f}")

    # Model checkpointing - save if current model is better
    if r2 > best_val_r2:
        best_val_r2 = r2
        torch.save({
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'best_r2': best_val_r2,
            'loss': criterion
        }, best_model_path)
        print(f"\n✅ Saved best model (Val R²: {best_val_r2:.4f})")

print('\n***Training Complete***')
print(f"Best Validation R²: {best_val_r2:.4f}")

# Print final summary of best per-pollutant scores
print("\nFinal Per-Pollutant R² Summary (Last Epoch):")
print(f"{'Pollutant':<8} | {'Train R²':<10} | {'Validation R²':<10}")
print("-" * 40)
for i, name in enumerate(columns):
    last_train_r2 = history['per_pollutant_train_r2'][i][-1]
    last_val_r2 = history['per_pollutant_r2'][i][-1]
    print(f"{name:<8} | {last_train_r2:.4f}      | {last_val_r2:.4f}")


Training Progress:   0%|          | 0/50 [00:00<?, ?it/s]


Epoch [1/50]
Train Loss: 3054.8382 | Val Loss: 2784.1122
Train R²: -137.4977 | Val R²: -53.5922
MAE: 29.6101 | RMSE: 52.7647

Training Per-pollutant R² Scores:
  VOC: -616.2827
  CO: -43.6896
  NO2: -21.1117
  SO2: -0.6395
  PM2.5: -113.5350
  PM10: -139.7245
  O3: -27.4977

Validation Per-pollutant R² Scores:
  VOC: -33.6322
  CO: -23.5091
  NO2: -21.6899
  SO2: -0.3596
  PM2.5: -103.3525
  PM10: -164.2733
  O3: -28.3286

✅ Saved best model (Val R²: -53.5922)

Epoch [2/50]
Train Loss: 2553.0022 | Val Loss: 2254.4124
Train R²: -397.4531 | Val R²: -31.4794
MAE: 25.8876 | RMSE: 47.4807

Training Per-pollutant R² Scores:
  VOC: -2498.4414
  CO: -49.9827
  NO2: -16.1118
  SO2: -0.1861
  PM2.5: -80.9883
  PM10: -113.4272
  O3: -23.0165

Validation Per-pollutant R² Scores:
  VOC: -2.5177
  CO: -0.0036
  NO2: -14.9038
  SO2: -0.0152
  PM2.5: -61.3678
  PM10: -118.4044
  O3: -23.1438

✅ Saved best model (Val R²: -31.4794)

Epoch [3/50]
Train Loss: 1977.6234 | Val Loss: 1642.3706
Train R²: -89

## Visualization

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

pollutants = ['VOC', 'CO', 'NO2', 'SO2', 'PM2.5', 'PM10', 'O3']
n_vars = len(pollutants)

plt.figure(figsize=(18, 10))
for i in range(n_vars):
    plt.subplot(2, 4, i+1)
    # Use regplot for scatter and regression line
    sns.regplot(x=y_true[:, i], y=y_pred[:, i], scatter_kws={'alpha': 0.5, 's': 20}, line_kws={'color': 'red', 'linestyle': '--'})
    plt.xlabel("True")
    plt.ylabel("Predicted")
    plt.title(pollutants[i])
plt.tight_layout()
plt.suptitle("Predicted vs. True AQI Values", fontsize=16, y=1.05)
plt.show()

plt.figure(figsize=(18, 10))
for i in range(n_vars):
    plt.subplot(2, 4, i+1)
    plt.scatter(y_true[:, i], y_pred[:, i], alpha=0.5, s=20)
    plt.plot([y_true[:, i].min(), y_true[:, i].max()],
    [y_true[:, i].min(), y_true[:, i].max()], 'r--')
    plt.xlabel("True")
    plt.ylabel("Predicted")
    plt.title(pollutants[i])
plt.tight_layout()
plt.suptitle("Predicted vs. True AQI Values", fontsize=16, y=1.05)
plt.show()

from sklearn.metrics import r2_score
r2_scores = [r2_score(y_true[:, i], y_pred[:, i]) for i in range(n_vars)]
plt.figure(figsize=(4, 3))
plt.bar(pollutants, r2_scores)
plt.ylabel("R² Score")
plt.title("R² Scores for Each AQI Variable")
plt.ylim(-1, 1)
plt.grid(axis='y')
plt.show()



## Step 7: Save your trained model

In [None]:
torch.save(model.state_dict(), 'aqi_model.pth')
print('Model saved successfully.')


## Step 8: Prediction function & Demo

In [None]:
def predict_aqi(image_path, model, device):
    transform = transforms.Compose([
        transforms.Resize((224,224)),
        transforms.ToTensor(),
    ])

    model.eval()
    img = Image.open(image_path).convert('RGB')
    img_tensor = transform(img).unsqueeze(0).to(device)

    with torch.no_grad():
        output = model(img_tensor).cpu().numpy()[0]

    aqi_dict = {
        "VOC": output[0],
        "CO": output[1],
        "NO2": output[2],
        "SO2": output[3],
        "PM2.5": output[4],
        "PM10": output[5],
        "O3": output[6],
    }
    return aqi_dict

# Load trained model for prediction
model.load_state_dict(torch.load('aqi_model.pth', map_location=device))
model.to(device)

# Demo Prediction
test_image = 'June_all_images/image202506240001.jpg'  # replace with your test image
predictions = predict_aqi(test_image, model, device)
print(f'Predicted AQI values for {test_image}:', predictions)
