<a href="https://colab.research.google.com/github/alwaysneedhelp/Practice-for-IOAI/blob/main/Radar_Solution_After_Discussion.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install gdown



In [2]:
!gdown https://drive.google.com/uc?id=1mXqBIqSfHif3LvJ7Jce1C7addqdfu7B-
!unzip Millimeter-wave_dataset.zip

Downloading...
From (original): https://drive.google.com/uc?id=1mXqBIqSfHif3LvJ7Jce1C7addqdfu7B-
From (redirected): https://drive.google.com/uc?id=1mXqBIqSfHif3LvJ7Jce1C7addqdfu7B-&confirm=t&uuid=46ca03ce-7ac5-4f0f-a2dc-ffe5342bd559
To: /content/Millimeter-wave_dataset.zip
100% 937M/937M [00:08<00:00, 109MB/s] 
Archive:  Millimeter-wave_dataset.zip
   creating: training_set/
  inflating: __MACOSX/._training_set  
  inflating: training_set/1503.mat.pt  
  inflating: __MACOSX/training_set/._1503.mat.pt  
  inflating: training_set/1252.mat.pt  
  inflating: __MACOSX/training_set/._1252.mat.pt  
  inflating: training_set/1403.mat.pt  
  inflating: __MACOSX/training_set/._1403.mat.pt  
  inflating: training_set/1352.mat.pt  
  inflating: __MACOSX/training_set/._1352.mat.pt  
  inflating: training_set/1098.mat.pt  
  inflating: __MACOSX/training_set/._1098.mat.pt  
  inflating: training_set/994.mat.pt  
  inflating: __MACOSX/training_set/._994.mat.pt  
  inflating: training_set/1198.mat.pt  

In [2]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import random
from sklearn.model_selection import train_test_split
import torch.nn.functional as F
data_path = '/content/training_set'
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [53]:
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, file_paths):
        self.file_paths = file_paths
        self.static_idx = torch.tensor([0,2,4])
        self.dynamic_idx = torch.tensor([1,3,5])


    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        data = torch.load(self.file_paths[idx])

        imgs_in  = data[:6].float()                 # [6, H, W] in original interleaved order
        static3  = imgs_in.index_select(0, self.static_idx)   # [3, H, W]
        dynamic3 = imgs_in.index_select(0, self.dynamic_idx)  # [3, H, W]
        images   = torch.cat([static3, dynamic3], dim=0)

        labels = data[6]

        images = images.float()
        labels = labels.long()

        return images, labels

class TrainDataset(CustomDataset):
    def __getitem__(self, idx):
        images, labels = super().__getitem__(idx)

        if random.random() < 0.5:
            images[0] = images[0].flip(-1)
            images[1] = images[1].flip(-1)
            labels = labels.flip(-1)

        return images, labels

file_paths = [f'{data_path}/{file}' for file in os.listdir(data_path) if file.endswith('.mat.pt')]

train_dataset = TrainDataset(file_paths=file_paths)

train_loader = torch.utils.data.DataLoader(
    dataset = train_dataset,
    shuffle = True,
    batch_size = 32,
    num_workers = 2,
    drop_last = True,
    pin_memory = True
)

In [46]:
class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.s = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size= 3, padding='same'),
            nn.BatchNorm2d(out_channels),
            nn.GELU(),
            nn.Dropout2d(0.3),
            nn.Conv2d(out_channels, out_channels, kernel_size= 3, padding='same'),
            nn.BatchNorm2d(out_channels),
            nn.GELU(),
        )
    def forward(self, x):
        return self.s(x)

In [47]:
class ResBlock2d(nn.Module):
  def __init__(self, in_channels, out_channels,res_scale=1.0):
    super().__init__()
    self.proj = nn.Conv2d(in_channels, out_channels, 1, bias=False)
    self.res_scale = res_scale

    self.block = DoubleConv(in_channels, out_channels)

  def forward(self, x):
    y = self.block(x)
    s = self.proj(x)
    return F.relu(s + self.res_scale * y, inplace=True)

In [48]:
class Encode(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.conv = ResBlock2d(in_channels, out_channels)
        self.pool = nn.MaxPool2d(2)
        self.dropout = nn.Dropout2d(0.3)
    def forward(self, x):
        x = self.conv(x)
        p = self.pool(x)
        p = self.dropout(p)
        return x, p

In [49]:
class AttentionGate(nn.Module):
    def __init__(self, F_g, F_l, F_int):
        super().__init__()
        self.W_g = nn.Sequential(
            nn.Conv2d(F_g, F_int, kernel_size=1, stride=1, padding=0, bias=True),
            nn.BatchNorm2d(F_int)
        )
        self.W_x = nn.Sequential(
            nn.Conv2d(F_l, F_int, kernel_size=1, stride=1, padding=0, bias=True),
            nn.BatchNorm2d(F_int)
        )
        self.psi = nn.Sequential(
            nn.Conv2d(F_int, 1, kernel_size=1, stride=1, padding=0, bias=True),
            nn.BatchNorm2d(1),
            nn.Sigmoid()
        )
        self.relu = nn.ReLU(inplace=True)

    def forward(self, g, x):  # g = decoder feature, x = encoder skip feature
        g1 = self.W_g(g)
        x1 = self.W_x(x)
        psi = self.relu(g1 + x1)
        psi = self.psi(psi)
        return x * psi  # element-wise gating


In [50]:
class Decode(nn.Module):
    def __init__(self, in_channels, skip, out_channels, pad):
        super().__init__()
        # F_g must be out_channels, not in_channels
        self.attn = AttentionGate(F_g=out_channels, F_l=skip, F_int=skip // 2)

        self.unpool = nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2, output_padding=pad)
        self.dropout = nn.Dropout2d(0.3)
        self.conv = ResBlock2d(out_channels + skip, out_channels)

    def forward(self, x, skip):
        x = self.unpool(x)
        skip = self.attn(x, skip)
        x = torch.cat([x, skip], dim=1)
        x = self.dropout(x)
        x = self.conv(x)
        return x

In [52]:
class DuoUNet(nn.Module):
    def __init__(self, in_channels=6, num_filters=32, num_classes=2):
        super().__init__()
        # Two encoders: static (0–2), dynamic (3–5)
        self.enc_s1 = Encode(3, num_filters)
        self.enc_s2 = Encode(num_filters, num_filters*2)
        self.enc_s3 = Encode(num_filters*2, num_filters*4)
        self.enc_s4 = Encode(num_filters*4, num_filters*8)

        self.enc_d1 = Encode(3, num_filters)
        self.enc_d2 = Encode(num_filters, num_filters*2)
        self.enc_d3 = Encode(num_filters*2, num_filters*4)
        self.enc_d4 = Encode(num_filters*4, num_filters*8)

        # Fusion (1x1 convs to mix static + dynamic features)
        def fuse(in_ch): return nn.Conv2d(in_ch * 2, in_ch, kernel_size=1, bias=False)
        self.fuse1 = fuse(num_filters)
        self.fuse2 = fuse(num_filters*2)
        self.fuse3 = fuse(num_filters*4)
        self.fuse4 = fuse(num_filters*8)

        # Shared bridge
        self.bridge = DoubleConv(num_filters*8, num_filters*16)

        # Shared decoder (attention + residual)
        self.up1 = Decode(num_filters*16, num_filters*8, num_filters*8, (0, 0))
        self.up2 = Decode(num_filters*8, num_filters*4, num_filters*4, (0, 1))
        self.up3 = Decode(num_filters*4, num_filters*2, num_filters*2, (1, 0))
        self.up4 = Decode(num_filters*2, num_filters, num_filters, (0, 1))

        self.out_conv = nn.Conv2d(num_filters, num_classes, kernel_size=1)

    def forward(self, x):
        # Split input
        xs, xd = [x[0], x[2], x[4]], [x[1], x[3], [x[5]]]

        # Encoder static
        x1s, xs = self.enc_s1(xs)
        x2s, xs = self.enc_s2(xs)
        x3s, xs = self.enc_s3(xs)
        x4s, xs = self.enc_s4(xs)

        # Encoder dynamic
        x1d, xd = self.enc_d1(xd)
        x2d, xd = self.enc_d2(xd)
        x3d, xd = self.enc_d3(xd)
        x4d, xd = self.enc_d4(xd)

        # Fuse static + dynamic features at each level
        x1 = self.fuse1(torch.cat([x1s, x1d], dim=1))
        x2 = self.fuse2(torch.cat([x2s, x2d], dim=1))
        x3 = self.fuse3(torch.cat([x3s, x3d], dim=1))
        x4 = self.fuse4(torch.cat([x4s, x4d], dim=1))

        # Bridge
        x = self.bridge(x4)

        # Decode
        x = self.up1(x, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)

        return self.out_conv(x)

In [54]:
class GroupedStem(nn.Module):
    def __init__(self, in_channels=6, out_channels=32, groups=3):
        super().__init__()
        mid = out_channels * groups
        self.net = nn.Sequential(
            nn.Conv2d(in_channels, mid, 3, padding=1, groups=groups, bias=False),
            nn.BatchNorm2d(mid),
            nn.ReLU(inplace=True),
            nn.Conv2d(mid, out_channels, 3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )
    def forward(self, x):
     return self.net(x)

In [56]:
class TriNet(nn.Module):
    def __init__(self, in_channels=6, num_filters=32, num_classes=2):
        super().__init__()
        self.stem = GroupedStem(in_channels, num_filters, groups=3)
        self.down1 = Encode(num_filters, num_filters * 2)
        self.down2 = Encode(num_filters * 2, num_filters * 4)
        self.down3 = Encode(num_filters * 4, num_filters * 8)
        self.bridge = DoubleConv(num_filters * 8, num_filters * 16)
        self.up1 = Decode(num_filters * 16, num_filters * 8, num_filters * 8, (0, 0))
        self.up2 = Decode(num_filters * 8, num_filters * 4, num_filters * 4, (0, 1))
        self.up3 = Decode(num_filters * 4, num_filters * 2, num_filters * 2, (1, 0))
        self.up4 = Decode(num_filters * 2, num_filters, num_filters, (0, 1))
        self.out_conv = nn.Conv2d(num_filters, num_classes, 1)

    def forward(self, x):
        x0 = self.stem(x)
        x1, x = self.down1(x0)
        x2, x = self.down2(x)
        x3, x = self.down3(x)
        x = self.bridge(x)
        x = self.up1(x, x3)
        x = self.up2(x, x2)
        x = self.up3(x, x1)
        x = self.up4(x, x0)
        return self.out_conv(x)


In [57]:
class MegaEnsemble(nn.Module):
    def __init__(self, num_filters=32, num_classes=2):
        super().__init__()
        self.tri = TriNet(num_filters=num_filters, num_classes=num_classes)
        self.double = DuoUNet(num_filters=num_filters, num_classes=num_classes)

    def forward(self, x):
        out_tri = self.tri(x)        # logits from geometry-grouped
        out_double = self.double(x)  # logits from modality-grouped
        return (out_tri + out_double) / 2  # average logits

In [58]:
def train(model):


    model.train()
    model.to(device)
    class_weights = torch.tensor([1., 1500.,])
    loss_fn = nn.CrossEntropyLoss(
        weight = class_weights, reduction='mean'
    )

    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    warmup = 1
    epochs = 9

    main_sch = torch.optim.lr_scheduler.LinearLR(
        optimizer, start_factor=1.0, end_factor=0.0, total_iters = epochs * len(train_loader)
    )
    warmup_sch = torch.optim.lr_scheduler.LinearLR(
        optimizer, start_factor=0.01, end_factor=1.0, total_iters = warmup * len(train_loader)
    )
    scheduler = torch.optim.lr_scheduler.SequentialLR(
        optimizer, schedulers = [warmup_sch, main_sch], milestones = [warmup * len(train_loader)]
    )

    scaler = torch.amp.GradScaler(device)

    for epoch in range(warmup+epochs):
        print(f'Epoch {epoch}:')

        model.train()

        running_loss = 0.0
        correct = 0.0

        for x, y in train_loader:
            x, y = x.to(device), y.to(device)

            with torch.amp.autocast(device):
                outputs = model(x)
                loss = loss_fn(outputs, y)

                optimizer.zero_grad()
                scaler.scale(loss).backward()
                scaler.step(optimizer)
                scaler.update()

                scheduler.step()
                running_loss += loss.item()

                _, predicted = torch.max(outputs, 1)
                weights = class_weights[y]  # Get weight for each sample's true class
                correct = (predicted == y).float()
                weighted_correct = correct * weights
                weighted_accuracy = weighted_correct.sum() / weights.sum()


        running_loss /= len(train_loader)
        train_acc = 100 * weighted_accuracy / len(train_loader)
        print(f'Train Loss : {running_loss}')
        print(f'Train Accuracy : {train_acc:.2f}')

    return model, running_loss

In [59]:
class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        self.models = nn.ModuleList(MegaEnsemble() for _ in range(5))

    def forward(self, x):
        return sum(model(x) for model in self.models) / len(self.models)

In [60]:
model = MyModel()

NameError: name 'DuoUNet' is not defined

In [61]:
train(model)

Epoch 0:


KeyboardInterrupt: 

In [None]:
# Please write your model code (including the necessary imported modules, such as torch and torch.nn) below to generate a model structure file that can be easily loaded by the grading platform
model_code = """
import torch
import torch.nn as nn

class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.s = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size= 3, padding='same'),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Dropout2d(0.3),
            nn.Conv2d(out_channels, out_channels, kernel_size= 3, padding='same'),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )
    def forward(self, x):
        return self.s(x)

class ResBlock2d(nn.Module):
  def __init__(self, in_channels, out_channels,res_scale=1.0):
    super().__init__()
    self.proj = nn.Conv2d(in_channels, out_channels, 1, bias=False)
    self.res_scale = res_scale

    self.block = DoubleConv(in_channels, out_channels)

  def forward(self, x):
    y = self.block(x)
    s = self.proj(x)
    return F.relu(s + self.res_scale * y, inplace=True)

class Encode(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.conv = ResBlock2d(in_channels, out_channels)
        self.pool = nn.MaxPool2d(2)
        self.dropout = nn.Dropout2d(0.3)
    def forward(self, x):
        x = self.conv(x)
        p = self.pool(x)
        p = self.dropout(p)
        return x, p

class AttentionGate(nn.Module):
    def __init__(self, F_g, F_l, F_int):
        super().__init__()
        self.W_g = nn.Sequential(
            nn.Conv2d(F_g, F_int, kernel_size=1, stride=1, padding=0, bias=True),
            nn.BatchNorm2d(F_int)
        )
        self.W_x = nn.Sequential(
            nn.Conv2d(F_l, F_int, kernel_size=1, stride=1, padding=0, bias=True),
            nn.BatchNorm2d(F_int)
        )
        self.psi = nn.Sequential(
            nn.Conv2d(F_int, 1, kernel_size=1, stride=1, padding=0, bias=True),
            nn.BatchNorm2d(1),
            nn.Sigmoid()
        )
        self.relu = nn.ReLU(inplace=True)

    def forward(self, g, x):  # g = decoder feature, x = encoder skip feature
        g1 = self.W_g(g)
        x1 = self.W_x(x)
        psi = self.relu(g1 + x1)
        psi = self.psi(psi)
        return x * psi  # element-wise gating


class Decode(nn.Module):
    def __init__(self, in_channels, skip, out_channels, pad):
        super().__init__()
        # F_g must be out_channels, not in_channels
        self.attn = AttentionGate(F_g=out_channels, F_l=skip, F_int=skip // 2)

        self.unpool = nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2, output_padding=pad)
        self.dropout = nn.Dropout2d(0.3)
        self.conv = ResBlock2d(out_channels + skip, out_channels)

    def forward(self, x, skip):
        x = self.unpool(x)
        skip = self.attn(x, skip)
        x = torch.cat([x, skip], dim=1)
        x = self.dropout(x)
        x = self.conv(x)
        return x

class UNet(nn.Module):
    def __init__(self, in_channels, num_filters, num_classes):
        super(UNet, self).__init__()

        self.down1 = Encode(in_channels, num_filters)
        self.down2 = Encode(num_filters, num_filters*2)
        self.down3 = Encode(num_filters*2, num_filters*4)
        self.down4 = Encode(num_filters*4, num_filters*8)

        self.bridge = DoubleConv(num_filters*8, num_filters*16)

        self.up1 = Decode(num_filters*16, num_filters*8, num_filters*8, (0 , 0))
        self.up2 = Decode(num_filters*8, num_filters*4, num_filters*4, (0 , 1))
        self.up3 = Decode(num_filters*4, num_filters*2, num_filters*2, (1 , 0))
        self.up4 = Decode(num_filters*2, num_filters, num_filters, (0 , 1))

        self.out_conv = nn.Conv2d(num_filters, num_classes, kernel_size=1)

    def forward(self, x):
        x1, x = self.down1(x)
        x2, x = self.down2(x)
        x3, x = self.down3(x)
        x4, x = self.down4(x)

        x = self.bridge(x)

        x = self.up1(x, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)

        return self.out_conv(x)

class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        self.models = nn.ModuleList(UNet(6, 32, 2) for _ in range(5))

    def forward(self, x):
        return sum(model(x) for model in self.models) / len(self.models)
"""
# Write code to file
with open('submission_model.py', 'w',encoding="utf-8") as f:
    f.write(model_code)
print("submission_model.py file has been generated.")

In [16]:
# Save the parameters of the model
torch.save(model.state_dict(), 'submission_dic.pth')
print("submission_dic.pth file has been saved.")

NameError: name 'model' is not defined

In [17]:
# This block mainly specifies the submission format of this question.
import zipfile
import os

# Define the files to zip and the zip file name.
files_to_zip = ['submission_model.py', 'submission_dic.pth']
zip_filename = 'submission.zip'

# Create a zip file
with zipfile.ZipFile(zip_filename, 'w') as zipf:
    for file in files_to_zip:
        # Add the file to the zip fil
        zipf.write(file, os.path.basename(file))

print(f'{zip_filename} Created successfully!')

FileNotFoundError: [Errno 2] No such file or directory: 'submission_dic.pth'