In [None]:
import numpy as np
import torch
import torchvision
import tqdm
from torch import nn
from torch.nn import functional as F
from sklearn.metrics import accuracy_score
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms as T
from tqdm import tqdm_notebook
from sklearn.metrics import accuracy_score
import random

import warnings
warnings.filterwarnings('ignore')

In [None]:
!wget https://www.dropbox.com/s/33l8lp62rmvtx40/dataset.zip?dl=1 -O dataset.zip && unzip -q dataset.zip

In [None]:
PRE_BATCH_SIZE = 8
BATCH_SIZE = 256
NUM_EPOCHS=27
SEED = 42
LR = 4e-4

In [None]:
def set_random_seed(seed):
    torch.backends.cudnn.deterministic = True
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)
set_random_seed(SEED)

In [None]:
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, data_dir, transform):
        super().__init__()
        self.data_dir = data_dir
        self.transform = torchvision.datasets.ImageFolder(data_dir, transform)
        pass
    
    def __getitem__(self, idx):
        return self.transform[idx]
    
    def __len__(self):
        return len(self.transform)

In [None]:
dataset_train = MyDataset("./dataset/dataset/train", transform = T.Compose([T.ToTensor()]))
train_dataloader = DataLoader(dataset_train, batch_size=PRE_BATCH_SIZE, shuffle=True)

cum_mean = 0
cum_std = 0
for images, _ in train_dataloader:
    cum_mean += images.mean(dim=(0, 2, 3))
    cum_std += images.std(dim=(0, 2, 3))

In [None]:
mean = cum_mean / len(train_dataloader)
std = cum_std / len(train_dataloader)

train_transform = T.Compose([
        T.RandomApply([
           T.RandomHorizontalFlip(0.15),
            T.ColorJitter(brightness=.3, hue=.2),
            T.TrivialAugmentWide(),
            T.RandomSolarize(threshold=170.0),
            T.RandomPerspective(distortion_scale=0.4, p=0.2),
        ], p=0.6),
        T.ToTensor(),
        T.Normalize(mean, std)])

val_transform = T.Compose([
    T.ToTensor(),
    T.Normalize(mean, std)

])

In [None]:
dataset_train = MyDataset("./dataset/dataset/train", transform=train_transform)
dataset_val = MyDataset("./dataset/dataset/val", transform=val_transform)

train_dataloader = DataLoader(dataset_train, batch_size=BATCH_SIZE, shuffle=True)
val_dataloader = DataLoader(dataset_val, batch_size=BATCH_SIZE, shuffle=True)

In [None]:
assert isinstance(dataset_train[0], tuple)
assert len(dataset_train[0]) == 2
assert isinstance(dataset_train[1][1], int)
print("tests passed")

In [None]:
def train(model, train_dataloader, eval_dataloader, criterion, optimizer, device="cpu", n_epochs=NUM_EPOCHS, schedulder=None):
    for epoch in range(n_epochs):
        train_one_epoch(model, train_dataloader, criterion, optimizer, device)
        loss = evaluate(model, eval_dataloader, criterion, device)
        print("\tEPOCH №{}".format(epoch), "has finished")
        if schedulder is not None:
            scheduler.step(loss)
    
    
def train_one_epoch(model, train_dataloader, criterion, optimizer, device):
    progress_bar = tqdm_notebook(train_dataloader)
    model = model.to(device).train()
    idx = 0
    for (images, labels) in progress_bar:
        preds = model(images.to(device))
        loss = criterion(preds, labels.to(device))
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        if idx % 10 == 0:
            progress_bar.set_description("Loss = {:.4f}".format(loss.item()))
        idx += 1
        
def evaluate(model, eval_dataloader, criterion, device="cuda:0"):
    cumulative_loss = 0
    acc = 0
    model = model.eval()
    with torch.no_grad():
        for idx, (images, labels) in enumerate(eval_dataloader): 
          images, labels = images.to(device), labels.to(device)
          preds = model(images)
          loss = criterion(preds, labels)
          cumulative_loss += loss.item()
          acc += (preds.argmax(1) == labels).float().mean()
    print("Loss = {:.4f}".format(cumulative_loss / idx), "accuracy = {:.4f}".format(acc / idx))
    return cumulative_loss

In [None]:
class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        self.block1 = nn.Sequential(
            nn.Conv2d(3, 16, (3, 3)),
            nn.MaxPool2d((2, 2)),
            nn.ReLU(),
            nn.BatchNorm2d(16),
        )
        
        self.block2 = nn.Sequential(
            nn.Conv2d(16, 64, (3, 3)),
            nn.MaxPool2d((2, 2)),
            nn.ReLU(),
            nn.BatchNorm2d(64),
        )
        
        self.block3 = nn.Sequential(
            nn.Conv2d(64, 256, (3, 3)),
            nn.MaxPool2d((2, 2)),
            nn.ReLU(),
            nn.BatchNorm2d(256),
        ) 
        
        
        self.block4 = nn.Sequential(
             nn.Conv2d(256, 256, (3, 3)),
             nn.MaxPool2d((2, 2)),
             nn.ReLU(),
             nn.BatchNorm2d(256),
         )
        
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(1024, 1024),
            nn.Dropout(0.1),
            nn.ReLU(),
            nn.Linear(1024, 200),
        )
        
    def forward(self, x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        return self.classifier(x)
model = MyModel()

In [None]:
model = MyModel()
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
criterion = nn.CrossEntropyLoss()
scheduler =  torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=0)
n_epochs = NUM_EPOCHS
device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")

In [None]:
train(model, train_dataloader, val_dataloader, criterion, optimizer, device='cuda', schedulder = scheduler)

In [None]:
from itertools import chain

def predict(model,  val_dataloader, criterion, device):
    cumulative_loss = 0
    acc = 0
    model = model.eval()
    resolver = {'all':[], 'pred':[], 'true': []}
    with torch.no_grad():
        for idx, (images, labels) in enumerate(val_dataloader): 
            images, labels = images.to(device), labels.to(device)
            preds = model(images)
            loss = criterion(preds, labels)
            
            resolver['all'].append(loss.item())
            resolver['true'].append(list(labels.cpu().detach().numpy()))
            resolver['pred'].append(list(preds.argmax(1).cpu().detach().numpy()))
    return list(resolver['all']), list(chain(*resolver['pred'])), list(chain(*resolver['true']))

In [None]:
all_losses, predicted_labels, true_labels = predict(model, val_dataloader, criterion, device)
assert len(predicted_labels) == len(dataset_val)
accuracy = accuracy_score(predicted_labels, true_labels)
print("tests passed")

In [None]:
print(f'Оценка за это задание составит {np.clip(10 * accuracy / 0.44, 0, 10):.2f} баллов,'\
      f' если вы делали часть 1, и {np.clip(10 * (accuracy - 0.5) / 0.34, 0, 10):.2f} баллов,'\
      f' если вы делали часть 2.')