## Without Augmentation


In [13]:
import os
import glob
import re
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from pytorch_msssim import ssim  # pip install pytorch-msssim
from sklearn.metrics import roc_auc_score, roc_curve, confusion_matrix, accuracy_score, f1_score, classification_report

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 1. Dataset loader for UCSD Ped2
default_transform = transforms.Compose([
    transforms.Grayscale(),
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

class UCSDPed2Dataset(Dataset):
    def __init__(self, root, phase='training', transform=None, gt_list=None):
        self.phase = phase
        self.transform = transform or default_transform
        subdir = 'Train' if phase == 'training' else 'Test'
        base = os.path.join(root, subdir)
        vids = sorted([d for d in os.listdir(base) if os.path.isdir(os.path.join(base, d))])
        self.paths = []
        self.labels = []
        for vid in vids:
            frame_dir = os.path.join(base, vid)
            for ext in ('*.png', '*.jpg', '*.jpeg', '*.tif'):
                for p in sorted(glob.glob(os.path.join(frame_dir, ext))):
                    self.paths.append(p)
                    if phase == 'testing' and gt_list is not None:
                        frame_idx = int(os.path.splitext(os.path.basename(p))[0])
                        # vid is like 'Test001' or 'Train001'
                        vid_idx = int(re.sub('[^0-9]', '', vid)) - 1
                        self.labels.append(1 if frame_idx in gt_list[vid_idx] else 0)
                    else:
                        self.labels.append(0)

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        img = Image.open(self.paths[idx]).convert('L')
        x = self.transform(img)
        return (x, self.labels[idx]) if self.phase == 'testing' else x

# 2. Load ground truth from .m file in root or Test folder
import glob, os, re

def load_ucsd_gt(root):
    # look for the .m file in the Test/ folder
    m_files = glob.glob(os.path.join(root, 'Test', '*.m'))
    if not m_files:
        raise FileNotFoundError(f"No .m files found in {os.path.join(root,'Test')} for GT.")
    text = open(m_files[0], 'r').read()

    # match lines like: TestVideoFile{end+1}.gt_frame = [61:180];
    matches = re.findall(
        r"TestVideoFile\{end\+1\}\.gt_frame\s*=\s*\[(\d+):(\d+)\];",
        text
    )
    if not matches:
        raise ValueError("No gt_frame definitions found in TestVideoFile .m file.")

    gt_list = []
    for start_str, end_str in matches:
        start, end = int(start_str), int(end_str)
        # MATLAB ranges are inclusive
        gt_list.append(list(range(start, end+1)))

    return gt_list

# 3. Combined Loss (MSE + L1 + MS-SSIM)
class CombinedLoss(nn.Module):
    def __init__(self, alpha=0.5, beta=0.3, gamma=0.2):
        super().__init__()
        self.mse = nn.MSELoss()
        self.l1 = nn.L1Loss()
        self.alpha, self.beta, self.gamma = alpha, beta, gamma

    def forward(self, recon, x):
        loss_mse = self.mse(recon, x)
        loss_l1 = self.l1(recon, x)
        x01 = (x + 1) / 2
        r01 = (recon + 1) / 2
        ssim_val = ssim(r01, x01, data_range=1.0, size_average=True)
        loss_ssim = 1 - ssim_val
        return self.alpha * loss_mse + self.beta * loss_l1 + self.gamma * loss_ssim

# 4. U-Net Autoencoder
class UNetAutoencoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.enc1 = nn.Sequential(nn.Conv2d(1,32,3,1,1), nn.ReLU(True))
        self.pool = nn.MaxPool2d(2,2)
        self.enc2 = nn.Sequential(nn.Conv2d(32,64,3,1,1), nn.ReLU(True))
        self.enc3 = nn.Sequential(nn.Conv2d(64,128,3,1,1), nn.ReLU(True))
        self.up23 = nn.ConvTranspose2d(128,64,2,2)
        self.dec3 = nn.Sequential(nn.Conv2d(128,64,3,1,1), nn.ReLU(True))
        self.up12 = nn.ConvTranspose2d(64,32,2,2)
        self.dec2 = nn.Sequential(nn.Conv2d(64,32,3,1,1), nn.ReLU(True))
        self.final = nn.Conv2d(32,1,1)

    def forward(self, x):
        e1 = self.enc1(x)
        e2 = self.enc2(self.pool(e1))
        e3 = self.enc3(self.pool(e2))
        d3 = self.up23(e3)
        d3 = torch.cat([d3,e2],1)
        d3 = self.dec3(d3)
        d2 = self.up12(d3)
        d2 = torch.cat([d2,e1],1)
        d2 = self.dec2(d2)
        return torch.tanh(self.final(d2))

# 5. Training loop
def train_model(model, loader, epochs=50, lr=1e-3):
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', factor=0.5, patience=5)
    criterion = CombinedLoss()
    for ep in range(1,epochs+1):
        model.train(); total_loss=0
        for batch in loader:
            x = batch[0] if isinstance(batch,(list,tuple)) else batch
            x = x.to(device)
            optimizer.zero_grad()
            recon = model(x)
            loss = criterion(recon,x)
            loss.backward(); optimizer.step()
            total_loss += loss.item()*x.size(0)
        avg_loss = total_loss/len(loader.dataset)
        scheduler.step(avg_loss)
        print(f"Epoch {ep}/{epochs}, Loss={avg_loss:.6f}")
    return model

# 6. Evaluation
def evaluate(model,root,gt_list,bs=32):
    model.eval()
    ds = UCSDPed2Dataset(root, phase='testing', gt_list=gt_list)
    loader = DataLoader(ds, batch_size=bs, shuffle=False)
    scores, labels = [], []
    with torch.no_grad():
        for x, y in loader:
            x = x.to(device)
            recon = model(x)
            err = torch.mean((recon-x)**2,[1,2,3]).cpu().numpy()
            scores.extend(err.tolist()); labels.extend(y)
    auc = roc_auc_score(labels, scores)
    fpr, tpr, th = roc_curve(labels, scores); thr = th[np.argmax(tpr-fpr)]
    preds = [1 if s>=thr else 0 for s in scores]
    cm = confusion_matrix(labels, preds); acc = accuracy_score(labels, preds); f1 = f1_score(labels, preds)
    print(f"AUC={auc:.4f},Acc={acc:.4f},F1={f1:.4f}\nCM:\n{cm}")
    print(classification_report(labels, preds, target_names=['Normal','Anomaly']))

# 7. Main
if __name__=='__main__':
    root = '/l/users/zainab.aldhanhani/AI702Project/UCSD_Anomaly_Dataset.v1p2/UCSDped2'
    gt_list = load_ucsd_gt(root)
    train_ds = UCSDPed2Dataset(root, phase='training', gt_list=gt_list)
    train_loader = DataLoader(train_ds, batch_size=32, shuffle=True, num_workers=4)
    model = UNetAutoencoder()
    model = train_model(model, train_loader, epochs=50, lr=1e-3)
    torch.save(model.state_dict(), 'ucsdped2_unet.pth')
    evaluate(model, root, gt_list)

Epoch 1/50, Loss=0.030610
Epoch 2/50, Loss=0.002876
Epoch 3/50, Loss=0.002173
Epoch 4/50, Loss=0.001766
Epoch 5/50, Loss=0.001468
Epoch 6/50, Loss=0.001309
Epoch 7/50, Loss=0.001194
Epoch 8/50, Loss=0.001125
Epoch 9/50, Loss=0.001070
Epoch 10/50, Loss=0.001023
Epoch 11/50, Loss=0.000983
Epoch 12/50, Loss=0.000947
Epoch 13/50, Loss=0.000914
Epoch 14/50, Loss=0.000884
Epoch 15/50, Loss=0.000856
Epoch 16/50, Loss=0.000832
Epoch 17/50, Loss=0.000812
Epoch 18/50, Loss=0.000792
Epoch 19/50, Loss=0.000776
Epoch 20/50, Loss=0.000762
Epoch 21/50, Loss=0.000747
Epoch 22/50, Loss=0.000751
Epoch 23/50, Loss=0.001460
Epoch 24/50, Loss=0.001684
Epoch 25/50, Loss=0.001595
Epoch 26/50, Loss=0.001271
Epoch 27/50, Loss=0.001233
Epoch 28/50, Loss=0.000517
Epoch 29/50, Loss=0.000417
Epoch 30/50, Loss=0.000426
Epoch 31/50, Loss=0.000432
Epoch 32/50, Loss=0.000435
Epoch 33/50, Loss=0.000438
Epoch 34/50, Loss=0.000439
Epoch 35/50, Loss=0.000440
Epoch 36/50, Loss=0.000351
Epoch 37/50, Loss=0.000335
Epoch 38/5

## MixUp & CutMix

In [1]:
import os
import glob
import re
import random
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from pytorch_msssim import ssim  # pip install pytorch-msssim
from sklearn.metrics import roc_auc_score, roc_curve, confusion_matrix, accuracy_score, f1_score, classification_report

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 0. MixUp and CutMix helper functions
def mixup_data(x, alpha=1.0):
    if alpha <= 0:
        return x
    lam = np.random.beta(alpha, alpha)
    batch_size = x.size(0)
    index = torch.randperm(batch_size).to(x.device)
    return lam * x + (1 - lam) * x[index]


def cutmix_data(x, alpha=1.0):
    if alpha <= 0:
        return x
    lam = np.random.beta(alpha, alpha)
    batch_size, _, H, W = x.size()
    index = torch.randperm(batch_size).to(x.device)
    cut_rat = np.sqrt(1. - lam)
    cut_w = int(W * cut_rat)
    cut_h = int(H * cut_rat)
    cx = random.randint(0, W)
    cy = random.randint(0, H)
    bbx1 = max(cx - cut_w // 2, 0)
    bby1 = max(cy - cut_h // 2, 0)
    bbx2 = min(cx + cut_w // 2, W)
    bby2 = min(cy + cut_h // 2, H)
    x_cutmix = x.clone()
    x_cutmix[:, :, bby1:bby2, bbx1:bbx2] = x[index, :, bby1:bby2, bbx1:bbx2]
    return x_cutmix

# 1. Dataset loader for UCSD Ped2
default_transform = transforms.Compose([
    transforms.Grayscale(),
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

class UCSDPed2Dataset(Dataset):
    def __init__(self, root, phase='training', transform=None, gt_list=None):
        self.phase = phase
        self.transform = transform or default_transform
        subdir = 'Train' if phase == 'training' else 'Test'
        base_dir = os.path.join(root, subdir)
        vids = sorted([d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))])
        self.paths = []
        self.labels = []
        for vid in vids:
            frame_dir = os.path.join(base_dir, vid)
            for ext in ('*.png', '*.jpg', '*.jpeg', '*.tif'):
                for p in sorted(glob.glob(os.path.join(frame_dir, ext))):
                    self.paths.append(p)
                    if phase == 'testing' and gt_list is not None:
                        frame_idx = int(os.path.splitext(os.path.basename(p))[0])
                        vid_idx = int(re.sub('[^0-9]', '', vid)) - 1
                        self.labels.append(1 if frame_idx in gt_list[vid_idx] else 0)
                    else:
                        self.labels.append(0)

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        img = Image.open(self.paths[idx]).convert('L')
        x = self.transform(img)
        return (x, self.labels[idx]) if self.phase == 'testing' else x

# 2. Load ground truth from .m file in Test folder
def load_ucsd_gt(root):
    test_dir = os.path.join(root, 'Test')
    m_files = glob.glob(os.path.join(test_dir, '*.m'))
    if not m_files:
        raise FileNotFoundError(f"No .m files found in {test_dir} for GT.")
    text = open(m_files[0], 'r').read()
    # parse lines: TestVideoFile{end+1}.gt_frame = [start:end];
    matches = re.findall(r"TestVideoFile\{end\+1\}\.gt_frame\s*=\s*\[(\d+):(\d+)\];", text)
    if not matches:
        raise ValueError("No gt_frame definitions in TestVideoFile .m file.")
    gt_list = []
    for s_str, e_str in matches:
        s, e = int(s_str), int(e_str)
        gt_list.append(list(range(s, e+1)))
    return gt_list

# 3. Combined Loss (MSE + L1 + MS-SSIM)
class CombinedLoss(nn.Module):
    def __init__(self, alpha=0.5, beta=0.3, gamma=0.2):
        super(CombinedLoss, self).__init__()
        self.mse = nn.MSELoss()
        self.l1 = nn.L1Loss()
        self.alpha = alpha
        self.beta = beta
        self.gamma = gamma

    def forward(self, recon, x):
        loss_mse = self.mse(recon, x)
        loss_l1 = self.l1(recon, x)
        x01 = (x + 1) / 2
        r01 = (recon + 1) / 2
        ssim_val = ssim(r01, x01, data_range=1.0, size_average=True)
        loss_ssim = 1 - ssim_val
        return self.alpha * loss_mse + self.beta * loss_l1 + self.gamma * loss_ssim

# 4. Simplified Autoencoder
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 32, 3, 2, 1), nn.ReLU(True),  # 128->64
            nn.Conv2d(32, 64, 3, 2, 1), nn.ReLU(True),  # 64->32
            nn.Conv2d(64,128, 3, 2, 1), nn.ReLU(True)   # 32->16
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(128,64,3,2,1,1), nn.ReLU(True),  # 16->32
            nn.ConvTranspose2d(64,32,3,2,1,1), nn.ReLU(True),   # 32->64
            nn.ConvTranspose2d(32, 1,3,2,1,1), nn.Tanh()        # 64->128
        )

    def forward(self, x):
        z = self.encoder(x)
        return self.decoder(z)

# 5. Trainer with MixUp & CutMix
class Trainer:
    def __init__(self, model, loader, lr=1e-3):
        self.model = model.to(device)
        self.loader = loader
        self.opt = optim.Adam(model.parameters(), lr=lr)
        self.sched = optim.lr_scheduler.ReduceLROnPlateau(self.opt, 'min', factor=0.5, patience=5)
        self.crit = CombinedLoss()

    def train(self, epochs=50, mixup_alpha=0.4, cutmix_alpha=0.4, mix_prob=0.5):
        for ep in range(1, epochs + 1):
            self.model.train()
            running_loss = 0.0
            for batch in self.loader:
                # handle either (x, labels) or x-only batches
                if isinstance(batch, (list, tuple)):
                    x = batch[0].to(device)
                else:
                    x = batch.to(device)
                # apply MixUp or CutMix randomly
                if random.random() < mix_prob:
                    if random.random() < 0.5:
                        x = mixup_data(x, mixup_alpha)
                    else:
                        x = cutmix_data(x, cutmix_alpha)
                self.opt.zero_grad()
                recon = self.model(x)
                loss = self.crit(recon, x)
                loss.backward()
                self.opt.step()
                running_loss += loss.item() * x.size(0)
            avg_loss = running_loss / len(self.loader.dataset)
            self.sched.step(avg_loss)
            print(f"Epoch {ep}/{epochs}, Loss={avg_loss:.6f}")
        return self.model

# 6. Evaluation Evaluation
def evaluate(model, root, gt_list, bs=32):
    model.eval()
    ds = UCSDPed2Dataset(root, phase='testing', gt_list=gt_list)
    loader = DataLoader(ds, batch_size=bs, shuffle=False)
    scores, labels = [], []
    with torch.no_grad():
        for x, y in loader:
            x = x.to(device)
            recon = model(x)
            err = torch.mean((recon - x)**2, dim=[1,2,3]).cpu().numpy()
            scores.extend(err.tolist())
            labels.extend(y)
    auc = roc_auc_score(labels, scores)
    fpr, tpr, th = roc_curve(labels, scores)
    thr = th[np.argmax(tpr - fpr)]
    preds = [1 if s >= thr else 0 for s in scores]
    cm = confusion_matrix(labels, preds)
    acc = accuracy_score(labels, preds)
    f1 = f1_score(labels, preds)
    print(f"AUC={auc:.4f}, Acc={acc:.4f}, F1={f1:.4f}\nCM:\n{cm}")
    print(classification_report(labels, preds, target_names=['Normal','Anomaly']))

# 7. Main
if __name__ == '__main__':
    root = '/l/users/zainab.aldhanhani/AI702Project/UCSD_Anomaly_Dataset.v1p2/UCSDped2'
    gt_list = load_ucsd_gt(root)
    train_ds = UCSDPed2Dataset(root, phase='training', gt_list=gt_list)
    train_loader = DataLoader(train_ds, batch_size=32, shuffle=True, num_workers=4)
    model = Autoencoder()
    trainer = Trainer(model, train_loader)
    model = trainer.train(epochs=50, mixup_alpha=0.4, cutmix_alpha=0.4, mix_prob=0.5)
    torch.save(model.state_dict(), 'ucsdped2_auto_mix.pth')
    evaluate(model, root, gt_list)


Epoch 1/50, Loss=0.105207
Epoch 2/50, Loss=0.035627
Epoch 3/50, Loss=0.025068
Epoch 4/50, Loss=0.020006
Epoch 5/50, Loss=0.017223
Epoch 6/50, Loss=0.015360
Epoch 7/50, Loss=0.014132
Epoch 8/50, Loss=0.013182
Epoch 9/50, Loss=0.012393
Epoch 10/50, Loss=0.011693
Epoch 11/50, Loss=0.011098
Epoch 12/50, Loss=0.010590
Epoch 13/50, Loss=0.010177
Epoch 14/50, Loss=0.009877
Epoch 15/50, Loss=0.009491
Epoch 16/50, Loss=0.009207
Epoch 17/50, Loss=0.008903
Epoch 18/50, Loss=0.008740
Epoch 19/50, Loss=0.008494
Epoch 20/50, Loss=0.008327
Epoch 21/50, Loss=0.008146
Epoch 22/50, Loss=0.007997
Epoch 23/50, Loss=0.007822
Epoch 24/50, Loss=0.007703
Epoch 25/50, Loss=0.007530
Epoch 26/50, Loss=0.007424
Epoch 27/50, Loss=0.007339
Epoch 28/50, Loss=0.007231
Epoch 29/50, Loss=0.007091
Epoch 30/50, Loss=0.006998
Epoch 31/50, Loss=0.006870
Epoch 32/50, Loss=0.006820
Epoch 33/50, Loss=0.006737
Epoch 34/50, Loss=0.006564
Epoch 35/50, Loss=0.006517
Epoch 36/50, Loss=0.006374
Epoch 37/50, Loss=0.006362
Epoch 38/5

## Environmental Effects

In [1]:
import os
import glob
import re
import random
import numpy as np
from PIL import Image, ImageFilter, ImageEnhance
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from pytorch_msssim import ssim  # pip install pytorch-msssim
from sklearn.metrics import roc_auc_score, roc_curve, confusion_matrix, accuracy_score, f1_score, classification_report

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Simulated Environmental Effects augmentation
class EnvironmentalTransform:
    def __init__(self, rain_prob=0.3, fog_prob=0.3, sun_prob=0.3):
        self.rain_prob = rain_prob
        self.fog_prob = fog_prob
        self.sun_prob = sun_prob

    def __call__(self, img):
        # Rain effect
        if random.random() < self.rain_prob:
            img = self._add_rain(img)
        # Fog effect
        if random.random() < self.fog_prob:
            img = self._add_fog(img)
        # Sun glare
        if random.random() < self.sun_prob:
            img = self._add_sun_glare(img)
        return img

    def _add_rain(self, img: Image.Image) -> Image.Image:
        arr = np.array(img)
        h, w = arr.shape
        rain = np.zeros((h, w), dtype=np.uint8)
        drops = int(h * w * 0.0005)
        for _ in range(drops):
            x = random.randint(0, w-1)
            y = random.randint(0, h-1)
            length = random.randint(10, 20)
            for i in range(length):
                yy = min(h-1, y+i)
                rain[yy, x] = 255
        rain_img = Image.fromarray(rain).filter(ImageFilter.GaussianBlur(1))
        return Image.blend(img, rain_img, alpha=0.3)

    def _add_fog(self, img: Image.Image) -> Image.Image:
        fog = Image.new('L', img.size, color=255)
        fog = fog.filter(ImageFilter.GaussianBlur(radius=img.size[0]//15))
        return Image.blend(img, fog, alpha=0.4)

    def _add_sun_glare(self, img: Image.Image) -> Image.Image:
        w, h = img.size
        mask = Image.new('L', (w, h), 0)
        cx, cy = random.randint(w//4, 3*w//4), random.randint(h//4, 3*h//4)
        rad = random.randint(min(w,h)//8, min(w,h)//4)
        yy, xx = np.ogrid[:h, :w]
        circle = ((xx-cx)**2 + (yy-cy)**2) <= rad**2
        mask_arr = np.zeros((h, w), dtype=np.uint8)
        mask_arr[circle] = 255
        mask = Image.fromarray(mask_arr)
        bright = ImageEnhance.Brightness(img).enhance(1.5)
        return Image.composite(bright, img, mask)

# 1. Dataset loader for UCSD Ped2
train_transform = transforms.Compose([
    EnvironmentalTransform(rain_prob=0.3, fog_prob=0.3, sun_prob=0.3),
    transforms.Grayscale(),
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

test_transform = transforms.Compose([
    transforms.Grayscale(),
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

class UCSDPed2Dataset(Dataset):
    def __init__(self, root, phase='training', transform=None, gt_list=None):
        self.phase = phase
        self.transform = transform
        subdir = 'Train' if phase == 'training' else 'Test'
        base_dir = os.path.join(root, subdir)
        vids = sorted([d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))])
        self.paths, self.labels = [], []
        for vid in vids:
            frame_dir = os.path.join(base_dir, vid)
            for ext in ('*.png','*.jpg','*.jpeg','*.tif'):
                for p in sorted(glob.glob(os.path.join(frame_dir, ext))):
                    self.paths.append(p)
                    if phase=='testing' and gt_list is not None:
                        idx = int(os.path.splitext(os.path.basename(p))[0])
                        vid_idx = int(re.sub('[^0-9]','',vid)) - 1
                        self.labels.append(1 if idx in gt_list[vid_idx] else 0)
                    else:
                        self.labels.append(0)

    def __len__(self): return len(self.paths)

    def __getitem__(self, idx):
        img = Image.open(self.paths[idx]).convert('L')
        x = self.transform(img)
        return (x, self.labels[idx]) if self.phase=='testing' else x

# 2. Load ground truth
import glob, re

def load_ucsd_gt(root):
    m_files = glob.glob(os.path.join(root,'Test','*.m'))
    if not m_files:
        raise FileNotFoundError(f"No .m GT files in {root}/Test")
    text = open(m_files[0],'r').read()
    matches = re.findall(r"TestVideoFile\{end\+1\}\.gt_frame\s*=\s*\[(\d+):(\d+)\];", text)
    if not matches: raise ValueError("No gt_frame lines found.")
    return [list(range(int(s),int(e)+1)) for s,e in matches]

# 3. Combined Loss (MSE + L1 + MS-SSIM)
class CombinedLoss(nn.Module):
    def __init__(self, alpha=0.5,beta=0.3,gamma=0.2):
        super().__init__()
        self.mse=nn.MSELoss(); self.l1=nn.L1Loss()
        self.alpha, self.beta, self.gamma = alpha,beta,gamma
    def forward(self,recon,x):
        m=self.mse(recon,x); l=self.l1(recon,x)
        x01=(x+1)/2; r01=(recon+1)/2
        s=ssim(r01,x01,data_range=1.0,size_average=True)
        return self.alpha*m + self.beta*l + self.gamma*(1-s)

# 4. Simplified Autoencoder
def conv_autoencoder():
    return nn.Sequential(
        nn.Conv2d(1,32,3,2,1), nn.ReLU(True),
        nn.Conv2d(32,64,3,2,1), nn.ReLU(True),
        nn.Conv2d(64,128,3,2,1), nn.ReLU(True),
        nn.ConvTranspose2d(128,64,3,2,1,1), nn.ReLU(True),
        nn.ConvTranspose2d(64,32,3,2,1,1), nn.ReLU(True),
        nn.ConvTranspose2d(32,1,3,2,1,1), nn.Tanh()
    )

# 5. Training loop
    
def train_model(model, loader, epochs=50, lr=1e-3):
    model.to(device)
    opt = optim.Adam(model.parameters(), lr=lr)
    sched = optim.lr_scheduler.ReduceLROnPlateau(opt, 'min', factor=0.5, patience=5)
    crit = CombinedLoss()
    for ep in range(1, epochs+1):
        model.train()
        tot_loss = 0.0
        for batch in loader:
            # batch may be (x, labels) or just x
            if isinstance(batch, (list, tuple)):
                x = batch[0].to(device)
            else:
                x = batch.to(device)
            opt.zero_grad()
            recon = model(x)
            loss = crit(recon, x)
            loss.backward()
            opt.step()
            tot_loss += loss.item() * x.size(0)
        avg_loss = tot_loss / len(loader.dataset)
        sched.step(avg_loss)
        print(f"Epoch {ep}/{epochs}, Loss={avg_loss:.6f}")
    return model

# 6. Evaluation
def evaluate(model, root, gt_list, bs=32):
    model.eval()
    scores, labels = [], []
    ds = UCSDPed2Dataset(root, 'testing', test_transform, gt_list)
    loader = DataLoader(ds, batch_size=bs, shuffle=False)
    with torch.no_grad():
        for x, y in loader:
            x = x.to(device)
            recon = model(x)
            err = torch.mean((recon - x) ** 2, dim=[1, 2, 3]).cpu().numpy()
            scores.extend(err.tolist())
            labels.extend(y)
    auc = roc_auc_score(labels, scores)
    fpr, tpr, th = roc_curve(labels, scores)
    thr = th[np.argmax(tpr - fpr)]
    preds = [1 if s >= thr else 0 for s in scores]
    cm = confusion_matrix(labels, preds)
    acc = accuracy_score(labels, preds)
    f1 = f1_score(labels, preds)
    print(f"AUC={auc:.4f}, Acc={acc:.4f}, F1={f1:.4f}CM:{cm}")
    print(classification_report(labels, preds, target_names=['Normal', 'Anomaly']))

# 7. Main
if __name__=='__main__':
    root='/l/users/zainab.aldhanhani/AI702Project/UCSD_Anomaly_Dataset.v1p2/UCSDped2'
    gt_list=load_ucsd_gt(root)
    train_ds=UCSDPed2Dataset(root,'training',train_transform,gt_list)
    train_loader=DataLoader(train_ds, batch_size=32, shuffle=True, num_workers=4)
    model=conv_autoencoder()
    model=train_model(model,train_loader,50,1e-3)
    torch.save(model.state_dict(),'ucsdped2_env.pth')
    evaluate(model,root,gt_list)


Epoch 1/50, Loss=0.105661
Epoch 2/50, Loss=0.039038
Epoch 3/50, Loss=0.028453
Epoch 4/50, Loss=0.022394
Epoch 5/50, Loss=0.019367
Epoch 6/50, Loss=0.016954
Epoch 7/50, Loss=0.015717
Epoch 8/50, Loss=0.014761
Epoch 9/50, Loss=0.013638
Epoch 10/50, Loss=0.013035
Epoch 11/50, Loss=0.012304
Epoch 12/50, Loss=0.011732
Epoch 13/50, Loss=0.011364
Epoch 14/50, Loss=0.011076
Epoch 15/50, Loss=0.010593
Epoch 16/50, Loss=0.010209
Epoch 17/50, Loss=0.010389
Epoch 18/50, Loss=0.009942
Epoch 19/50, Loss=0.009174
Epoch 20/50, Loss=0.008878
Epoch 21/50, Loss=0.008653
Epoch 22/50, Loss=0.008568
Epoch 23/50, Loss=0.008631
Epoch 24/50, Loss=0.008064
Epoch 25/50, Loss=0.008099
Epoch 26/50, Loss=0.008101
Epoch 27/50, Loss=0.008128
Epoch 28/50, Loss=0.007572
Epoch 29/50, Loss=0.007519
Epoch 30/50, Loss=0.007783
Epoch 31/50, Loss=0.007144
Epoch 32/50, Loss=0.007166
Epoch 33/50, Loss=0.007133
Epoch 34/50, Loss=0.006870
Epoch 35/50, Loss=0.006886
Epoch 36/50, Loss=0.007060
Epoch 37/50, Loss=0.006743
Epoch 38/5

## Fourier Domain Adaptation (FDA)

In [9]:
import os
import glob
import re
import random
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from pytorch_msssim import ssim  # pip install pytorch-msssim
from sklearn.metrics import roc_auc_score, roc_curve, confusion_matrix, accuracy_score, f1_score, classification_report

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Fourier Domain Adaptation augmentation
class FDATransform:
    def __init__(self, root, patch_ratio=0.1, probability=0.5):
        self.probability = probability
        self.patch_ratio = patch_ratio
        base = os.path.join(root, 'Train')
        self.img_paths = []
        vids = sorted(d for d in os.listdir(base) if os.path.isdir(os.path.join(base, d)))
        for vid in vids:
            frame_dir = os.path.join(base, vid)
            for ext in ('*.png','*.jpg','*.jpeg','*.tif'):
                self.img_paths += glob.glob(os.path.join(frame_dir, ext))

    def __call__(self, img):
        if random.random() > self.probability or not self.img_paths:
            return img
        tgt = np.array(img).astype(np.float32)
        src_path = random.choice(self.img_paths)
        src = Image.open(src_path).convert('L').resize(img.size)
        src = np.array(src).astype(np.float32)
        fft_tgt = np.fft.fft2(tgt)
        fft_src = np.fft.fft2(src)
        amp_tgt, pha_tgt = np.abs(fft_tgt), np.angle(fft_tgt)
        amp_src = np.abs(fft_src)
        h, w = tgt.shape
        b = int(min(h, w) * self.patch_ratio)
        cy, cx = h//2, w//2
        amp_tgt[cy-b:cy+b, cx-b:cx+b] = amp_src[cy-b:cy+b, cx-b:cx+b]
        fft_new = amp_tgt * np.exp(1j * pha_tgt)
        img_back = np.fft.ifft2(fft_new)
        img_back = np.real(img_back)
        img_back = np.clip(img_back, 0, 255).astype(np.uint8)
        return Image.fromarray(img_back)

# Dataset loader for UCSD Ped2
class UCSDPed2Dataset(Dataset):
    def __init__(self, root, phase='training', transform=None, gt_list=None):
        self.phase = phase
        self.transform = transform
        subdir = 'Train' if phase=='training' else 'Test'
        base_dir = os.path.join(root, subdir)
        vids = sorted(d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d)))
        self.paths, self.labels = [], []
        for vid in vids:
            frame_dir = os.path.join(base_dir, vid)
            for ext in ('*.png','*.jpg','*.jpeg','*.tif'):
                for p in sorted(glob.glob(os.path.join(frame_dir, ext))):
                    self.paths.append(p)
                    if phase=='testing' and gt_list is not None:
                        idx = int(os.path.splitext(os.path.basename(p))[0])
                        vid_idx = int(re.sub('[^0-9]','',vid)) - 1
                        self.labels.append(1 if idx in gt_list[vid_idx] else 0)
                    else:
                        self.labels.append(0)

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        img = Image.open(self.paths[idx]).convert('L')
        x = self.transform(img)
        return (x, self.labels[idx]) if self.phase=='testing' else x

# Ground truth loader from .m file
def load_ucsd_gt(root):
    m_files = glob.glob(os.path.join(root, 'Test', '*.m'))
    if not m_files:
        raise FileNotFoundError(f"No .m GT files in {os.path.join(root,'Test')}")
    text = open(m_files[0],'r').read()
    matches = re.findall(r"TestVideoFile\{end\+1\}\.gt_frame\s*=\s*\[(\d+):(\d+)\];", text)
    if not matches:
        raise ValueError("No gt_frame lines found in .m file.")
    return [list(range(int(s), int(e)+1)) for s, e in matches]

# Combined Loss (MSE + L1 + MS-SSIM)
class CombinedLoss(nn.Module):
    def __init__(self, alpha=0.5, beta=0.3, gamma=0.2):
        super().__init__()
        self.mse = nn.MSELoss()
        self.l1 = nn.L1Loss()
        self.alpha, self.beta, self.gamma = alpha, beta, gamma
    def forward(self, recon, x):
        m = self.mse(recon, x)
        l = self.l1(recon, x)
        x01 = (x + 1) / 2
        r01 = (recon + 1) / 2
        s = ssim(r01, x01, data_range=1.0, size_average=True)
        return self.alpha * m + self.beta * l + self.gamma * (1 - s)

# Simplified convolutional autoencoder
def conv_autoencoder():
    return nn.Sequential(
        nn.Conv2d(1,32,3,2,1), nn.ReLU(True),
        nn.Conv2d(32,64,3,2,1), nn.ReLU(True),
        nn.Conv2d(64,128,3,2,1), nn.ReLU(True),
        nn.ConvTranspose2d(128,64,3,2,1,1), nn.ReLU(True),
        nn.ConvTranspose2d(64,32,3,2,1,1), nn.ReLU(True),
        nn.ConvTranspose2d(32,1,3,2,1,1), nn.Tanh()
    )

# Training loop
def train_model(model, loader, epochs=50, lr=1e-3):
    model.to(device)
    opt = optim.Adam(model.parameters(), lr=lr)
    sched = optim.lr_scheduler.ReduceLROnPlateau(opt, 'min', factor=0.5, patience=5)
    crit = CombinedLoss()
    for ep in range(1, epochs+1):
        model.train()
        tot = 0
        for batch in loader:
            x = batch[0].to(device) if isinstance(batch, (list, tuple)) else batch.to(device)
            opt.zero_grad()
            recon = model(x)
            loss = crit(recon, x)
            loss.backward()
            opt.step()
            tot += loss.item() * x.size(0)
        avg = tot / len(loader.dataset)
        sched.step(avg)
        print(f"Epoch {ep}/{epochs}, Loss={avg:.6f}")
    return model

# Evaluation
def evaluate(model, root, gt_list, bs=32):
    model.eval()
    scores, labels = [], []
    ds = UCSDPed2Dataset(root, 'testing', transform=test_transform, gt_list=gt_list)
    loader = DataLoader(ds, batch_size=bs, shuffle=False)
    with torch.no_grad():
        for x, y in loader:
            x = x.to(device)
            recon = model(x)
            err = torch.mean((recon - x)**2, dim=[1,2,3]).cpu().numpy()
            scores.extend(err.tolist())
            labels.extend(y)
    auc = roc_auc_score(labels, scores)
    fpr, tpr, th = roc_curve(labels, scores)
    thr = th[np.argmax(tpr - fpr)]
    preds = [1 if s >= thr else 0 for s in scores]
    cm = confusion_matrix(labels, preds)
    print(f"AUC={auc:.4f}, Acc={accuracy_score(labels,preds):.4f}, F1={f1_score(labels,preds):.4f}\nCM:\n{cm}")
    print(classification_report(labels, preds, target_names=['Normal','Anomaly']))

# Main
if __name__ == '__main__':
    root = '/l/users/zainab.aldhanhani/AI702Project/UCSD_Anomaly_Dataset.v1p2/UCSDped2'
    # define transforms with FDA
    train_transform = transforms.Compose([
        FDATransform(root, patch_ratio=0.1, probability=0.5),
        transforms.Grayscale(),
        transforms.Resize((128, 128)),
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
    ])
    test_transform = transforms.Compose([
        transforms.Grayscale(),
        transforms.Resize((128, 128)),
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
    ])
    gt_list = load_ucsd_gt(root)
    train_ds = UCSDPed2Dataset(root, 'training', transform=train_transform, gt_list=gt_list)
    train_loader = DataLoader(train_ds, batch_size=32, shuffle=True, num_workers=4)
    model = conv_autoencoder()
    model = train_model(model, train_loader, epochs=50, lr=1e-3)
    torch.save(model.state_dict(), 'ucsdped2_fda.pth')
    evaluate(model, root, gt_list)

Epoch 1/50, Loss=0.115845
Epoch 2/50, Loss=0.037415
Epoch 3/50, Loss=0.025830
Epoch 4/50, Loss=0.020191
Epoch 5/50, Loss=0.017085
Epoch 6/50, Loss=0.015104
Epoch 7/50, Loss=0.013727
Epoch 8/50, Loss=0.012617
Epoch 9/50, Loss=0.011819
Epoch 10/50, Loss=0.011089
Epoch 11/50, Loss=0.010527
Epoch 12/50, Loss=0.010028
Epoch 13/50, Loss=0.009564
Epoch 14/50, Loss=0.009183
Epoch 15/50, Loss=0.008866
Epoch 16/50, Loss=0.008573
Epoch 17/50, Loss=0.008329
Epoch 18/50, Loss=0.008060
Epoch 19/50, Loss=0.007871
Epoch 20/50, Loss=0.007710
Epoch 21/50, Loss=0.007511
Epoch 22/50, Loss=0.007369
Epoch 23/50, Loss=0.007209
Epoch 24/50, Loss=0.007106
Epoch 25/50, Loss=0.006949
Epoch 26/50, Loss=0.006821
Epoch 27/50, Loss=0.006764
Epoch 28/50, Loss=0.006629
Epoch 29/50, Loss=0.006533
Epoch 30/50, Loss=0.006425
Epoch 31/50, Loss=0.006306
Epoch 32/50, Loss=0.006280
Epoch 33/50, Loss=0.006192
Epoch 34/50, Loss=0.006078
Epoch 35/50, Loss=0.006023
Epoch 36/50, Loss=0.005922
Epoch 37/50, Loss=0.005877
Epoch 38/5

## Elastic Deformations

In [None]:
import os
import glob
import re
import random
import numpy as np
import scipy.ndimage as ndi
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from pytorch_msssim import ssim  # pip install pytorch-msssim
from sklearn.metrics import roc_auc_score, roc_curve, confusion_matrix, accuracy_score, f1_score, classification_report

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Elastic deformation augmentation
class ElasticTransform:
    def __init__(self, alpha=34, sigma=4, probability=0.5):
        self.alpha = alpha
        self.sigma = sigma
        self.probability = probability

    def __call__(self, img: Image.Image) -> Image.Image:
        if random.random() > self.probability:
            return img
        arr = np.array(img)
        shape = arr.shape
        dx = ndi.gaussian_filter((np.random.rand(*shape) * 2 - 1), self.sigma) * self.alpha
        dy = ndi.gaussian_filter((np.random.rand(*shape) * 2 - 1), self.sigma) * self.alpha
        x, y = np.meshgrid(np.arange(shape[1]), np.arange(shape[0]))
        indices = (np.reshape(y + dy, (-1,)), np.reshape(x + dx, (-1,)))
        distorted = ndi.map_coordinates(arr, indices, order=1, mode='reflect').reshape(shape)
        return Image.fromarray(distorted.astype(np.uint8))

# Dataset loader for UCSD Ped2
class UCSDPed2Dataset(Dataset):
    def __init__(self, root, phase='training', transform=None, gt_list=None):
        self.phase = phase
        self.transform = transform
        subdir = 'Train' if phase == 'training' else 'Test'
        base_dir = os.path.join(root, subdir)
        vids = sorted(d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d)))
        self.paths, self.labels = [], []
        for vid in vids:
            frame_dir = os.path.join(base_dir, vid)
            for ext in ('*.png', '*.jpg', '*.jpeg', '*.tif'):
                for p in sorted(glob.glob(os.path.join(frame_dir, ext))):
                    self.paths.append(p)
                    if phase == 'testing' and gt_list is not None:
                        idx = int(os.path.splitext(os.path.basename(p))[0])
                        vid_idx = int(re.sub('[^0-9]', '', vid)) - 1
                        self.labels.append(1 if idx in gt_list[vid_idx] else 0)
                    else:
                        self.labels.append(0)

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        img = Image.open(self.paths[idx]).convert('L')
        x = self.transform(img)
        return (x, self.labels[idx]) if self.phase == 'testing' else x

# Load ground truth from .m file
def load_ucsd_gt(root):
    test_dir = os.path.join(root, 'Test')
    m_files = glob.glob(os.path.join(test_dir, '*.m'))
    if not m_files:
        raise FileNotFoundError(f"No .m GT files in {test_dir}")
    text = open(m_files[0], 'r').read()
    matches = re.findall(r"TestVideoFile\{end\+1\}\.gt_frame\s*=\s*\[(\d+):(\d+)\];", text)
    if not matches:
        raise ValueError("No gt_frame lines found in .m file.")
    return [list(range(int(s), int(e)+1)) for s, e in matches]

# Combined Loss (MSE + L1 + MS-SSIM)
class CombinedLoss(nn.Module):
    def __init__(self, alpha=0.5, beta=0.3, gamma=0.2):
        super().__init__()
        self.mse = nn.MSELoss()
        self.l1 = nn.L1Loss()
        self.alpha, self.beta, self.gamma = alpha, beta, gamma

    def forward(self, recon, x):
        loss_mse = self.mse(recon, x)
        loss_l1 = self.l1(recon, x)
        x01 = (x + 1) / 2
        r01 = (recon + 1) / 2
        ssim_val = ssim(r01, x01, data_range=1.0, size_average=True)
        loss_ssim = 1 - ssim_val
        return self.alpha * loss_mse + self.beta * loss_l1 + self.gamma * loss_ssim

# Simplified convolutional autoencoder
def conv_autoencoder():
    return nn.Sequential(
        nn.Conv2d(1, 32, 3, 2, 1), nn.ReLU(True),
        nn.Conv2d(32, 64, 3, 2, 1), nn.ReLU(True),
        nn.Conv2d(64, 128, 3, 2, 1), nn.ReLU(True),
        nn.ConvTranspose2d(128, 64, 3, 2, 1, 1), nn.ReLU(True),
        nn.ConvTranspose2d(64, 32, 3, 2, 1, 1), nn.ReLU(True),
        nn.ConvTranspose2d(32, 1, 3, 2, 1, 1), nn.Tanh()
    )

# Training loop
def train_model(model, loader, epochs=50, lr=1e-3):
    model.to(device)
    opt = optim.Adam(model.parameters(), lr=lr)
    sched = optim.lr_scheduler.ReduceLROnPlateau(opt, 'min', factor=0.5, patience=5)
    crit = CombinedLoss()
    for ep in range(1, epochs + 1):
        model.train()
        tot = 0
        for batch in loader:
            x = batch[0].to(device) if isinstance(batch, (list, tuple)) else batch.to(device)
            opt.zero_grad()
            recon = model(x)
            loss = crit(recon, x)
            loss.backward()
            opt.step()
            tot += loss.item() * x.size(0)
        avg = tot / len(loader.dataset)
        sched.step(avg)
        print(f"Epoch {ep}/{epochs}, Loss={avg:.6f}")
    return model

# Evaluation
def evaluate(model, root, gt_list, bs=32):
    model.eval()
    scores, labels = [], []
    ds = UCSDPed2Dataset(root, 'testing', transform=test_transform, gt_list=gt_list)
    loader = DataLoader(ds, batch_size=bs, shuffle=False)
    with torch.no_grad():
        for x, y in loader:
            x = x.to(device)
            recon = model(x)
            err = torch.mean((recon - x)**2, dim=[1,2,3]).cpu().numpy()
            scores.extend(err.tolist())
            labels.extend(y)
    auc = roc_auc_score(labels, scores)
    fpr, tpr, th = roc_curve(labels, scores)
    thr = th[np.argmax(tpr - fpr)]
    preds = [1 if s >= thr else 0 for s in scores]
    cm = confusion_matrix(labels, preds)
    print(f"AUC={auc:.4f}, Acc={accuracy_score(labels,preds):.4f}, F1={f1_score(labels,preds):.4f}\nCM:\n{cm}")
    print(classification_report(labels, preds, target_names=['Normal','Anomaly']))

# Main
if __name__ == '__main__':
    root = '/l/users/zainab.aldhanhani/AI702Project/UCSD_Anomaly_Dataset.v1p2/UCSDped2'
    # define transforms with Elastic deformation
    train_transform = transforms.Compose([
        ElasticTransform(alpha=34, sigma=4, probability=0.5),
        transforms.Grayscale(),
        transforms.Resize((128, 128)),
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
    ])
    test_transform = transforms.Compose([
        transforms.Grayscale(),
        transforms.Resize((128, 128)),
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
    ])
    gt_list = load_ucsd_gt(root)
    train_ds = UCSDPed2Dataset(root, 'training', transform=train_transform, gt_list=gt_list)
    train_loader = DataLoader(train_ds, batch_size=32, shuffle=True, num_workers=4)
    model = conv_autoencoder()
    model = train_model(model, train_loader, epochs=50, lr=1e-3)
    torch.save(model.state_dict(), 'ucsdped2_elastic.pth')
    evaluate(model, root, gt_list)


Epoch 1/50, Loss=0.111221
Epoch 2/50, Loss=0.036086
Epoch 3/50, Loss=0.025693
Epoch 4/50, Loss=0.020824
Epoch 5/50, Loss=0.018145
Epoch 6/50, Loss=0.016509
Epoch 7/50, Loss=0.015372
Epoch 8/50, Loss=0.014418
Epoch 9/50, Loss=0.013668
Epoch 10/50, Loss=0.013058
Epoch 11/50, Loss=0.012447
Epoch 12/50, Loss=0.011904
Epoch 13/50, Loss=0.011509
Epoch 14/50, Loss=0.011074
Epoch 15/50, Loss=0.010772
Epoch 16/50, Loss=0.010455
Epoch 17/50, Loss=0.010122
Epoch 18/50, Loss=0.009842
Epoch 19/50, Loss=0.009610
Epoch 20/50, Loss=0.009402
Epoch 21/50, Loss=0.009154
Epoch 22/50, Loss=0.008947
Epoch 23/50, Loss=0.008751
Epoch 24/50, Loss=0.008621
Epoch 25/50, Loss=0.008398
Epoch 26/50, Loss=0.008306
Epoch 27/50, Loss=0.008139
Epoch 28/50, Loss=0.007972
Epoch 29/50, Loss=0.007860
Epoch 30/50, Loss=0.007760
Epoch 31/50, Loss=0.007622
Epoch 32/50, Loss=0.007490
Epoch 33/50, Loss=0.007439
Epoch 34/50, Loss=0.007331
Epoch 35/50, Loss=0.007261
Epoch 36/50, Loss=0.007160
Epoch 37/50, Loss=0.007031
Epoch 38/5