**Author:**

Haolong Li (haolong.li@epfl.ch) (352680)

Zhibo Zhao (zhibo.zhao@epfl.ch) (350593)

**NOTE:** It is NOT expected to run this notebook, because the training data is too big to upload. We upload this notebook just in case of integrity.

**Refer to `classify.ipynb` for the kaggle-solution reproduce pipeline.**

The result of the training (model weights) is stored in `model/best_model.pth`.

In [1]:
import torch
import torchvision.models as models
import os
import xml.etree.ElementTree as ET
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from PIL import Image
import json
import numpy as np
import torchvision.models as models
from torch.utils.data import Subset

In [2]:
def parse_xml(xml_file):
    tree = ET.parse(xml_file)
    root = tree.getroot()
    annotations = []
    for member in root.findall('object'):
        value = member.find('name').text
        bndbox = member.find('bndbox')
        xmin = int(bndbox.find('xmin').text)
        ymin = int(bndbox.find('ymin').text)
        xmax = int(bndbox.find('xmax').text)
        ymax = int(bndbox.find('ymax').text)
        annotations.append((value, (xmin, ymin, xmax, ymax)))
    return annotations


In [3]:
# Looking at one parsed XML file
xml_file = os.path.join('data', 'train',  'L1010277.xml')
res = parse_xml(xml_file)
res

[('5CHF', (1588, 1605, 2511, 2516)),
 ('2EUR', (3011, 1311, 3774, 2079)),
 ('0.2EUR', (2574, 2342, 3158, 2932)),
 ('0.5EUR', (3132, 2884, 3716, 3474)),
 ('0.2EUR', (1868, 2963, 2505, 3532))]

In [9]:
# label string to integer mapping
def collect_labels(xml_dir):
    labels = set()
    for xml_file in os.listdir(xml_dir):
        if xml_file.endswith('.xml'):
            tree = ET.parse(os.path.join(xml_dir, xml_file))
            root = tree.getroot()
            for member in root.findall('object'):
                labels.add(member.find('name').text)
    return labels

label_set = collect_labels(os.path.join('data', 'train'))
label_to_index = {label: idx for idx, label in enumerate(label_set)}

print(label_to_index)


{'0.05EUR': 0, '1EUR': 1, '5CHF': 2, '0.02EUR': 3, '1CHF': 4, '0.01EUR': 5, '2CHF': 6, '0.2CHF': 7, '0.05CHF': 8, '0.5CHF': 9, '0.5EUR': 10, '2EUR': 11, '0.1EUR': 12, '0.1CHF': 13, 'OOD': 14, '0.2EUR': 15}


In [6]:
# the dataset class
class CoinDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        # temp root = data/temp_train
        # temp anno = data/temp_train_annotations
        self.root_dir = root_dir
        self.transform = transform
        self.img_files = [os.path.join(root_dir, file) for file in os.listdir(root_dir) if file.endswith('.JPG')] # list of all image files
        self.annotations = [parse_xml(file.replace('.JPG', '.xml')) for file in self.img_files] # list of lists, each list contains tuples of value and bounding box coordinates

    def __len__(self):
        return len(self.img_files)

    def __getitem__(self, idx):
        img_path = self.img_files[idx]
        image = Image.open(img_path).convert('RGB')
        annotations = self.annotations[idx]
        
        coins = []
        for value, (xmin, ymin, xmax, ymax) in annotations:
            coin = image.crop((xmin, ymin, xmax, ymax))
            if self.transform:
                coin = self.transform(coin)
            coins.append((coin, value))
        
        return coins


In [8]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

dataset = CoinDataset(root_dir=os.path.join('data', 'train'), transform=transform)

In [16]:
def collate_fn(batch):
    images = []
    labels = []
    # 遍历batch中的每一个元素，每个元素都是一个图片中的多个硬币
    for item in batch:
        for coin, label in item:
            images.append(coin)           # 添加每个硬币的图像
            labels.append(label_to_index[label])  # 使用映射转换标签

    # 将images列表转换为一个tensor，labels列表转换为一个tensor
    images = torch.stack(images)
    labels = torch.tensor(labels, dtype=torch.int64)

    return images, labels



def train_val_split(dataset, val_split=0.2):
    indices = np.arange(len(dataset))
    np.random.shuffle(indices)
    split = int(np.floor(val_split * len(indices)))
    train_indices, val_indices = indices[split:], indices[:split]
    
    train_dataset = Subset(dataset, train_indices)
    val_dataset = Subset(dataset, val_indices)
    
    return train_dataset, val_dataset


train_dataset, val_dataset = train_val_split(dataset, val_split=0.1)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, collate_fn=collate_fn)

In [17]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [20]:


# 加载预训练的 ResNet50 模型
model = models.resnet50(weights='IMAGENET1K_V2')

# 修改最后一个全连接层以匹配硬币分类的类别数
num_classes = len(label_to_index)  # 假设你已经有了标签索引
model.fc = torch.nn.Linear(model.fc.in_features, num_classes)

model = model.to(device)

In [21]:
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

In [22]:
def validate_model(model, data_loader, criterion):
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for images, labels in data_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
    return val_loss / len(data_loader)


def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=5):
    best_val_loss = float('inf')
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        # 计算验证损失
        val_loss = validate_model(model, val_loader, criterion)
        print(f'Epoch {epoch+1}, Loss: {running_loss/len(train_loader)}, Validation Loss: {val_loss}')
        
        # 检查是否有最佳模型，并保存
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_model.pth')
            print(f"Saved better model with validation loss: {val_loss}")


In [23]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

# 现在开始训练
train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=15)

Epoch 1, Loss: 2.740930271148682, Validation Loss: 2.7348225116729736
Saved better model with validation loss: 2.7348225116729736
Epoch 2, Loss: 2.39723653793335, Validation Loss: 2.633789539337158
Saved better model with validation loss: 2.633789539337158
Epoch 3, Loss: 2.0391609191894533, Validation Loss: 2.408545732498169
Saved better model with validation loss: 2.408545732498169
Epoch 4, Loss: 1.6688083171844483, Validation Loss: 2.1467816829681396
Saved better model with validation loss: 2.1467816829681396
Epoch 5, Loss: 1.3558610200881958, Validation Loss: 1.8719234466552734
Saved better model with validation loss: 1.8719234466552734
Epoch 6, Loss: 1.049644160270691, Validation Loss: 1.673026204109192
Saved better model with validation loss: 1.673026204109192
Epoch 7, Loss: 0.7817895650863648, Validation Loss: 1.4931954145431519
Saved better model with validation loss: 1.4931954145431519
Epoch 8, Loss: 0.5447116613388061, Validation Loss: 1.3262101411819458
Saved better model wit

In [26]:
# do more trainings, as we haven't seen overfitting yet

# load model
model.load_state_dict(torch.load('best_model.pth'))
model = model.to(device)
train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10)

Epoch 1, Loss: 0.02251990996301174, Validation Loss: 0.9925457835197449
Saved better model with validation loss: 0.9925457835197449
Epoch 2, Loss: 0.0176901800557971, Validation Loss: 0.9894407987594604
Saved better model with validation loss: 0.9894407987594604
Epoch 3, Loss: 0.016770420409739018, Validation Loss: 1.0101858377456665
Epoch 4, Loss: 0.018623148463666438, Validation Loss: 1.0089820623397827
Epoch 5, Loss: 0.008884755708277226, Validation Loss: 1.024297833442688
Epoch 6, Loss: 0.009139344561845064, Validation Loss: 1.0353553295135498
Epoch 7, Loss: 0.007786692492663861, Validation Loss: 1.0406599044799805
Epoch 8, Loss: 0.0076099943369627, Validation Loss: 1.0470120906829834
Epoch 9, Loss: 0.006128749437630176, Validation Loss: 1.0534926652908325
Epoch 10, Loss: 0.006275934912264347, Validation Loss: 1.0614908933639526


In [27]:
# save the label to index mapping

with open('label_to_index.json', 'w') as f:
    json.dump(label_to_index, f)

In [25]:
model.eval()  # 设置模型为评估模式

def predict_image(image_path, model, transform, device):
    image = Image.open(image_path).convert('RGB')
    image = transform(image)
    image = image.unsqueeze(0)  # 添加batch维度
    image = image.to(device)

    with torch.no_grad():
        output = model(image)
        probabilities = torch.nn.functional.softmax(output[0], dim=0)
        top_prob, top_catid = torch.topk(probabilities, 1)

    # 反转label_to_index字典
    index_to_label = {v: k for k, v in label_to_index.items()}

    return index_to_label[top_catid.item()], top_prob.item()

# 示例推理
test_image_path = os.path.join('data', '1.jpg')
predicted_class, probability = predict_image(test_image_path, model, transform, device)
print(f"Predicted class: {predicted_class}, Probability: {probability}")

Predicted class: 5CHF, Probability: 0.9047756791114807
