In [1]:
import os
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from PIL import Image

In [2]:
# Define the Siamese Network architecture
class SiameseChangeDetectionModel(nn.Module):
    def __init__(self):
        super(SiameseChangeDetectionModel, self).__init__()
        # Shared Encoder (e.g., ResNet18 without the last layers)
        resnet = models.resnet18(pretrained=True)
        self.encoder = nn.Sequential(*list(resnet.children())[:-2])  # Remove the last two layers
        
        # Decoder to generate the binary mask
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2),  # 8x8 -> 16x16
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2),  # 16x16 -> 32x32
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2),   # 32x32 -> 64x64
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(64, 32, kernel_size=2, stride=2),    # 64x64 -> 128x128
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(32, 16, kernel_size=2, stride=2),    # 128x128 -> 256x256
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(16, 1, kernel_size=1),               # Final layer
            nn.Sigmoid()  # For binary mask output
        )
        
    def forward(self, x1, x2):
        # Encode both images
        f1 = self.encoder(x1)
        f2 = self.encoder(x2)
        # Compute feature difference
        diff = torch.abs(f1 - f2)
        # Decode to get the change mask
        out = self.decoder(diff)
        return out

In [3]:
# Prepare the dataset
class ChangeDetectionDataset(Dataset):
    def __init__(self, image_dir1, image_dir2, mask_dir, transform=None):
        self.image_dir1 = image_dir1
        self.image_dir2 = image_dir2
        self.mask_dir = mask_dir
        self.transform = transform
        self.filenames = os.listdir(self.image_dir1)
        
    def __len__(self):
        return len(self.filenames)
    
    def __getitem__(self, idx):
        filename = self.filenames[idx]
        img1 = Image.open(os.path.join(self.image_dir1, filename)).convert('RGB')
        img2 = Image.open(os.path.join(self.image_dir2, filename)).convert('RGB')
        mask = Image.open(os.path.join(self.mask_dir, filename)).convert('L')
        
        if self.transform:
            img1 = self.transform(img1)
            img2 = self.transform(img2)
            mask = self.transform(mask)
            mask = (mask > 0).float()  # Binarize the mask
            
        return img1, img2, mask

In [4]:
def __getitem__(self, idx):
    # ... existing code ...
    if self.transform:
        img1 = self.transform(img1)
        img2 = self.transform(img2)
        mask = self.transform(mask)
        mask = (mask > 0).float()  # Binarize the mask if it's not already

In [5]:
# Define transformations
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
])

In [6]:
# Create dataset and dataloader
image_dir1 = '/home/hehe/fyp/dataset/normalized/current'
image_dir2 = '/home/hehe/fyp/dataset/normalized/past'
mask_dir = '/home/hehe/fyp/dataset/train/masks'

In [9]:
dataset = ChangeDetectionDataset(image_dir1, image_dir2, mask_dir, transform=transform)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True, num_workers=4, pin_memory=True)

In [10]:
# Set up the training loop with CUDA support
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = SiameseChangeDetectionModel().to(device)

# For multiple GPUs
if torch.cuda.device_count() > 1:
    model = nn.DataParallel(model)

criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
num_epochs = 20

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    for img1, img2, mask in dataloader:
        img1 = img1.to(device, non_blocking=True)
        img2 = img2.to(device, non_blocking=True)
        mask = mask.to(device, non_blocking=True)
        
        optimizer.zero_grad()
        output = model(img1, img2)
        loss = criterion(output, mask)
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
    avg_loss = epoch_loss / len(dataloader)
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}')

# Optionally, save the model
torch.save(model.state_dict(), 'siamese_change_detection_model.pth')



Epoch [1/20], Loss: 0.6651
Epoch [2/20], Loss: 0.2233
Epoch [3/20], Loss: 0.1841
Epoch [4/20], Loss: 0.1672
Epoch [5/20], Loss: 0.1587
Epoch [6/20], Loss: 0.1510
Epoch [7/20], Loss: 0.1470
Epoch [8/20], Loss: 0.1436
Epoch [9/20], Loss: 0.1404
Epoch [10/20], Loss: 0.1381
Epoch [11/20], Loss: 0.1346
Epoch [12/20], Loss: 0.1325
Epoch [13/20], Loss: 0.1303
Epoch [14/20], Loss: 0.1267
Epoch [15/20], Loss: 0.1234
Epoch [16/20], Loss: 0.1202
Epoch [17/20], Loss: 0.1171
Epoch [18/20], Loss: 0.1141
Epoch [19/20], Loss: 0.1117
Epoch [20/20], Loss: 0.1090
