In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import pandas as pd
from PIL import Image
import os
from tqdm import tqdm

In [7]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=3):
        super(ResNet, self).__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.avgpool(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out


def ResNet20():
    return ResNet(BasicBlock, [3, 3, 3, 3])


In [8]:
class ChestXrayDataset(Dataset):
    def __init__(self, image_folder, csv_file, transform=None):
        self.image_folder = image_folder
        self.data = pd.read_csv(csv_file)
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name = os.path.join(self.image_folder, f'img_{self.data.iloc[idx, 0]}.png')
        image = Image.open(img_name).convert('L')
        if self.transform:
            image = self.transform(image)
        label = int(self.data.iloc[idx, 1])
        return image, label

In [9]:
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
])


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


train_images_folder = "data/train_images/"
train_csv_file = "data/train_answers.csv"
batch_size = 32
num_epochs = 10


train_dataset = ChestXrayDataset(train_images_folder, train_csv_file, transform=train_transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)


model = ResNet20().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [10]:
model.train()
for epoch in range(num_epochs):
    running_loss = 0.0
    tqdm_loader = tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs}')
    for images, labels in tqdm_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * images.size(0)
        tqdm_loader.set_postfix({'Loss': loss.item()})
    epoch_loss = running_loss / len(train_dataset)
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss:.4f}')

Epoch 1/10:   0%|          | 0/844 [01:14<?, ?it/s]


KeyboardInterrupt: 

In [ ]:
torch.save(model.state_dict(), 'resnet20.pth')

In [ ]:
submission_df = pd.DataFrame(columns=['id', 'target_feature'])
test_images_folder = "data/test_images/"
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

submission_data = []

for i, filename in enumerate(os.listdir(test_images_folder)):
    img_path = os.path.join(test_images_folder, filename)
    img = Image.open(img_path).convert('L')
    img = test_transform(img).unsqueeze(0).to(device)
    output = model(img)
    _, predicted = torch.max(output, 1)
    if predicted.item() is None or predicted.item() == '':
        target_feature = 0
    else:
        target_feature = predicted.item()
    submission_data.append({'id': i, 'target_feature': target_feature})

submission_df = pd.DataFrame(submission_data)
submission_df.to_csv('submission_file-resnet20.csv', index=False)