In [122]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm
import numpy as np
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from PIL import Image
from torchvision import transforms
from torch.utils.data.sampler import RandomSampler,SequentialSampler
from tqdm import tqdm

In [51]:
test_path = "data/test.csv"
train_path = "data/train.csv"
image_path = "data/"

In [28]:
train = pd.read_csv(train_path)

In [32]:
len(train['label'].unique())


176

In [117]:
class LeaveDataset(Dataset):
    
    def __init__(self, 
                 csv_path, 
                 img_file,
                 mode="train", valid_ratio=.2):

        self.mode = mode
        self.root = img_file
        
        csv_data = pd.read_csv(csv_path, header=None)
        n_example = len(csv_data) - 1
        self.train_len = int(n_example * (1 - valid_ratio))
        
        if mode == "train":
            self.train_img_path = np.asarray(csv_data.iloc[1:self.train_len,0])
            self.train_label = np.asarray(csv_data.iloc[1:self.train_len, 1].unique())
            n_class = len(self.train_label)
            self.num_to_label = dict(zip(self.train_label, range(n_class)))
            self.label_to_num = {v: k for k, v in self.num_to_label.items()}
            
        elif mode == "valid":
            self.valid_img_path = np.asarray(csv_data.iloc[self.train_len:,0])
            self.valid_label = np.asarray(csv_data.iloc[self.train_len:,1].unique())
            n_class = len(self.valid_label)
            self.num_to_label = dict(zip(self.valid_label, range(n_class)))
            self.label_to_num = {v: k for k, v in self.num_to_label.items()}
            
        else:
            self.test_img_path = np.asarray(csv_data.iloc[1:,0])
            
    def __getitem__(self, index):
        
        if self.mode == "train":
            img_path = self.train_img_path[index]
            img = Image.open(self.root + img_path)
            
            transform = transforms.Compose([
                
                transforms.Resize(224),
                transforms.RandomHorizontalFlip(p=0.5),
                transforms.RandomVerticalFlip(p=0.5),
                transforms.ToTensor(),
                transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
            ])

            label = self.label_to_num[index]
            return transform(img), label 
        else:
            if self.mode == "valid":
                img_path = self.valid_img_path[index]
                img = Image.open(self.root + img_path)
                transform = transforms.Compose([
                    transforms.Resize(256),
                    transforms.ToTensor(),
                    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
                ])
                label = self.label_to_num[idnex]
                return transform(img), label
            else:
                img_path = self.test_img_path[index]
                img = Image.open(self.root + img_path)
                return img
       
    
    def __len__(self):
        
        if self.mode == "train":
            return len(self.train_label)
        elif self.mode == "valid":
            return len(self.valid_label)
        else:
            return len(self.test_label)

In [118]:
train_dataset = LeaveDataset(
    csv_path=train_path, 
    img_file=image_path,
    mode="train"
)
valid_dataset =  LeaveDataset(
    csv_path=train_path, 
    img_file=image_path,
    mode="valid"
)
test_dataset =  LeaveDataset(
    csv_path=test_path, 
    img_file=image_path,
    mode="test"
)

In [85]:
def load_data(dataset, batch_size, n_workers, sampler):
    return  DataLoader(
        dataset=dataset, 
        batch_size=batch_size, 
        num_workers=n_workers, 
        sampler=sampler
    )
    

In [120]:
train_loader = load_data(train_dataset, 32, 0, RandomSampler(train_dataset))

In [121]:
for i in range(2):
    for batch in train_loader:
        img, label = batch
        print(label)

FileNotFoundError: [Errno 2] No such file or directory: 'data/images/10.jpg'

In [6]:
class Residual(nn.Module):
    
    def __init__(self, in_channel, out_channel, kernel=3, padding=1,use_1conv=False,strides=1):
        super().__init__()
        self.conv1 = nn.Conv2d(
            in_channels=in_channel,
            out_channels=out_channel, 
            kernel_size=kernel, 
            padding=padding, 
            stride=strides)
        
        self.conv2 = nn.Conv2d(
            in_channels=out_channel,
            out_channels=out_channel,
            kernel_size=kernel, 
            padding=padding)
    
        if use_1conv:
            self.conv3 = nn.Conv2d(
                in_channels=in_channel, 
                out_channels=out_channel, 
                kernel_size=1,
                stride=strides)
        else:
            self.conv3 = None
        self.bn1 = nn.BatchNorm2d(out_channel)
        self.bn2 = nn.BatchNorm2d(out_channel)
        
    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        if self.conv3:
            x = self.conv3(x)
        out += x
        return out

In [7]:
def resnet_block(in_channels, out_channels, n_residuals, kernel=3, padding=1, first_block=False):
    blocks = []
    for i in range(n_residuals):
        if i == 0 and not first_block:
            blocks.append(Residual(in_channels, out_channels,
                                use_1conv=True, strides=2))
        else:
            blocks.append(Residual(out_channels, out_channels))
    return blocks

In [8]:
class ResNet18(nn.Module):
    
    def __init__(self, n_labels):
        super().__init__()
        self.n_labels = n_labels
        self.block1 = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )
        self.block2 = nn.Sequential(*resnet_block(64,64,2,first_block=True))
        self.block3 = nn.Sequential(*resnet_block(64,128,2))
        self.block4 = nn.Sequential(*resnet_block(128,256,2))
        self.block5 = nn.Sequential(*resnet_block(256,512,2))
        self.avgPool = nn.AdaptiveAvgPool2d((1,1))
        self.ffn = nn.Linear(512, n_labels)
        
    def forward(self, x):
        
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        x = self.block5(x)
        x = self.avgPool(x)
        x = nn.Flatten()(x)
        x = self.ffn(x)
        x = F.softmax(x)
        return x

In [None]:
def train(model, 
          data_loader, 
          optimizer, 
          n_epochs, 
          loss_fn, 
          device, 
          batch_size):
    
    model.train()
    for epoch in range(n_epochs):
        with torch.enable_grad(), tqdm(total=len(data_loader.dataset)) as pbar:
            for batch in data_loader:
                optimizer.grad_zero()
                img, label = batch
                img, label = img.to(device), label.to(device)
                pred = model(img)
                loss = loss_fn(pred, label)
                
                loss.backward()
                optimizer.step()
                pbar.update(batch_size)
                pbar.set_postfix(epoch=epoch, loss=loss.item())
        

In [18]:
resnet = ResNet18()
X = torch.rand(size=(1, 1, 224, 224))

In [20]:
out = resnet(X)
print(out.shape)

torch.Size([1, 512])




In [21]:
X

tensor([[[[0.2115, 0.9462, 0.5771,  ..., 0.2715, 0.8972, 0.8427],
          [0.3082, 0.7258, 0.6873,  ..., 0.4697, 0.9020, 0.4433],
          [0.8736, 0.2819, 0.2084,  ..., 0.8368, 0.7171, 0.7920],
          ...,
          [0.9148, 0.1651, 0.0162,  ..., 0.2623, 0.4546, 0.4492],
          [0.1979, 0.0238, 0.6904,  ..., 0.9718, 0.2023, 0.0023],
          [0.5575, 0.2381, 0.4994,  ..., 0.7385, 0.9487, 0.4454]]]])

In [68]:
blk = ResidualBlock(3,3,3,1)
X = torch.rand(4, 3, 6, 6)
Y = blk(X)
Y.shape

torch.Size([4, 3, 6, 6])

In [69]:
blk = ResidualBlock(3,6,3,1,use_1conv=True, strides=2)
blk(X).shape

torch.Size([4, 6, 3, 3])

In [79]:
def resnet_block(in_channels, out_channels, n_residuals, kernel=3, padding=1, first_block=False):
    blocks = []
    for i in range(n_residuals):
        if i == 0 and not first_block:
            blocks.append(ResidualBlock(input_channels, out_channels,
                                use_1conv=True, strides=2))
        else:
            blocks.append(ResidualBlock(out_channels, out_channels))
    return blocks

In [80]:
b2 = nn.Sequential(*resnet_block(64, 64, 2, first_block=True))