In [1]:
import numpy as np 
import pandas as pd
import os
import torch
import torchvision
from torchvision import datasets
import torchvision.transforms as transforms
from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import DataLoader,Dataset, sampler, random_split
from torchvision import models
!pip install timm # kaggle doesnt have it installed by default
import timm
from timm.loss import LabelSmoothingCrossEntropy # This is better than normal nn.CrossEntropyLoss
import matplotlib.pyplot as plt
%matplotlib inline
import sys
from tqdm import tqdm
import time
import copy
from os import listdir
from os.path import isfile, join
from PIL import Image
import glob 
import math
from typing import Any, Callable, cast, Dict, List, Optional, Tuple, Union


[0m

In [2]:
#Método para obtener todas las especies de pájaros a partir de la estructura de carpetas
def get_classes(data_dir):
    all_data = datasets.ImageFolder(data_dir)
    return all_data.classes

In [3]:
def img_to_patch(x, patch_size, flatten_channels=True):
    """
    Inputs:
        x - torch.Tensor representing the image of shape [B, C, H, W]
        patch_size - Number of pixels per dimension of the patches (integer)
        flatten_channels - If True, the patches will be returned in a flattened format
                           as a feature vector instead of a image grid.
    """
    B, C, H, W = x.shape
    x = x.reshape(B, C, H//patch_size, patch_size, W//patch_size, patch_size)
    x = x.permute(0, 2, 4, 1, 3, 5) # [B, H', W', C, p_H, p_W]
    x = x.flatten(1,2)              # [B, H'*W', C, p_H, p_W]
    if flatten_channels:
        x = x.flatten(2,4)          # [B, H'*W', C*p_H*p_W]
    return x

class MyViT(nn.Module):
    def __init__(self, input_shape, n_patches=56, hidden_d=8, n_heads=2, out_d=400, device=None):
        # Super constructor
        super(MyViT, self).__init__()
        self.device = device

        # Input and patches sizes
        self.input_shape = input_shape
        self.n_patches = n_patches
        self.n_heads = n_heads
        assert input_shape[1] % n_patches == 0, "Input shape not entirely divisible by number of patches"
        assert input_shape[2] % n_patches == 0, "Input shape not entirely divisible by number of patches"
        self.patch_size = (input_shape[1] / n_patches, input_shape[2] / n_patches)
        self.hidden_d = hidden_d

        # 1) Linear mapper
        self.input_d = int(input_shape[0] * self.patch_size[0] * self.patch_size[1])
        self.linear_mapper = nn.Linear(self.input_d, self.hidden_d)

        # 2) Classification token
        self.class_token = nn.Parameter(torch.rand(1, self.hidden_d))

        # 3) Positional embedding
        # (In forward method)

        # 4a) Layer normalization 1
        self.ln1 = nn.LayerNorm((self.n_patches ** 2 + 1, self.hidden_d))

        # 4b) Multi-head Self Attention (MSA) and classification token
        self.msa = MyMSA(self.hidden_d, n_heads)

        # 5a) Layer normalization 2
        self.ln2 = nn.LayerNorm((self.n_patches ** 2 + 1, self.hidden_d))

        # 5b) Encoder MLP
        self.enc_mlp = nn.Sequential(
            nn.Linear(self.hidden_d, self.hidden_d),
            nn.ReLU()
        )

        # 6) Classification MLP
        self.mlp = nn.Sequential(
            nn.Linear(self.hidden_d, out_d),
            nn.Softmax(dim=-1)
        )

    def forward(self, images):
        # Dividing images into patches
        n, c, w, h = images.shape
        patches = img_to_patch(images, 224 // self.n_patches)

        # Running linear layer for tokenization
        tokens = self.linear_mapper(patches)

        # Adding classification token to the tokens
        tokens = torch.stack([torch.vstack((self.class_token, tokens[i])) for i in range(len(tokens))])

        # Adding positional embedding
        tokens += get_positional_embeddings(self.n_patches ** 2 + 1, self.hidden_d).repeat(n, 1, 1).to(self.device)

        # TRANSFORMER ENCODER BEGINS ###################################
        # NOTICE: MULTIPLE ENCODER BLOCKS CAN BE STACKED TOGETHER ######
        # Running Layer Normalization, MSA and residual connection
        out = tokens + self.msa(self.ln1(tokens))

        # Running Layer Normalization, MLP and residual connection
        out = out + self.enc_mlp(self.ln2(out))
        # TRANSFORMER ENCODER ENDS   ###################################

        # Getting the classification token only
        out = out[:, 0]

        return self.mlp(out)


class MyMSA(nn.Module):
    def __init__(self, d, n_heads=2):
        super(MyMSA, self).__init__()
        self.d = d
        self.n_heads = n_heads

        assert d % n_heads == 0, f"Can't divide dimension {d} into {n_heads} heads"

        d_head = int(d / n_heads)
        self.q_mappings = nn.ModuleList([nn.Linear(d_head, d_head) for _ in range(self.n_heads)])
        self.k_mappings = nn.ModuleList([nn.Linear(d_head, d_head) for _ in range(self.n_heads)])
        self.v_mappings = nn.ModuleList([nn.Linear(d_head, d_head) for _ in range(self.n_heads)])
        self.d_head = d_head
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, sequences):
        # Sequences has shape (N, seq_length, token_dim)
        # We go into shape    (N, seq_length, n_heads, token_dim / n_heads)
        # And come back to    (N, seq_length, item_dim)  (through concatenation)
        result = []
        for sequence in sequences:
            seq_result = []
            for head in range(self.n_heads):
                q_mapping = self.q_mappings[head]
                k_mapping = self.k_mappings[head]
                v_mapping = self.v_mappings[head]

                seq = sequence[:, head * self.d_head: (head + 1) * self.d_head]
                q, k, v = q_mapping(seq), k_mapping(seq), v_mapping(seq)
                del q_mapping,k_mapping,v_mapping
                attention = self.softmax(q @ k.T / (self.d_head ** 0.5))
                seq_result.append(attention @ v)
                del q,k,v
            result.append(torch.hstack(seq_result))
        return torch.cat([torch.unsqueeze(r, dim=0) for r in result])


def get_positional_embeddings(sequence_length, d):
    result = torch.ones(sequence_length, d)
    for i in range(sequence_length):
        for j in range(d):
            result[i][j] = np.sin(i / (10000 ** (j / d))) if j % 2 == 0 else np.cos(i / (10000 ** ((j - 1) / d)))
    return result

In [4]:
def pre_image(image_path,model):
    #img = Image.open(image_path)
    mean = [0.485, 0.456, 0.406] 
    std = [0.229, 0.224, 0.225]
    transform_norm = transforms.Compose([transforms.ToTensor(), 
    transforms.Resize((224,224)),transforms.Normalize(mean, std)])
   # get normalized image
    img_normalized = transform_norm(image_path).float()
    img_normalized = img_normalized.unsqueeze(0)
   # input = Variable(image_tensor)
    img_normalized = img_normalized.to(device)
   # print(img_normalized.shape)
    with torch.no_grad():
        model.eval()
        output =model(img_normalized)
     # print(output)
        index = output.data.cpu().numpy().argmax()
        class_name = classes[index]
        return class_name

In [5]:
IMG_EXTENSIONS = (".jpg", ".jpeg", ".png", ".ppm", ".bmp", ".pgm", ".tif", ".tiff", ".webp")

class ImageFolderCustom(datasets.DatasetFolder):
    
    def __init__(
        self,
        root: str,
        setname: str,
        transform: Optional[Callable] = None,
        target_transform: Optional[Callable] = None,
        loader: Callable[[str], Any] = datasets.folder.default_loader,
        is_valid_file: Optional[Callable[[str], bool]] = None,
    ):
        super().__init__(
            root,
            loader,
            IMG_EXTENSIONS if is_valid_file is None else None,
            transform=transform,
            target_transform=target_transform,
            is_valid_file=is_valid_file,
        )
        
        classes, class_to_idx = self.find_classes(self.root)
        self.samples = self.make_dataset2(self.root,setname, class_to_idx, IMG_EXTENSIONS, is_valid_file)
        self.imgs = self.samples
        
    @staticmethod
    def make_dataset2(
        directory: str,
        setname: str,
        class_to_idx: Optional[Dict[str, int]] = None,
        extensions: Optional[Union[str, Tuple[str, ...]]] = None,
        is_valid_file: Optional[Callable[[str], bool]] = None,    
    ) -> List[Tuple[str, int]]:
        """
        Generates a list of samples of a form (path_to_sample, class).

        See :class:`DatasetFolder` for details.

        Note: The class_to_idx parameter is here optional and will use the logic of the ``find_classes`` function
        by default.
        """
        setname = setname
        assert setname in ['train','val']

        if class_to_idx is None:
            _, class_to_idx = find_classes(directory)
        elif not class_to_idx:
            raise ValueError("'class_to_index' must have at least one entry to collect any samples.")

        both_none = extensions is None and is_valid_file is None
        both_something = extensions is not None and is_valid_file is not None
        if both_none or both_something:
            raise ValueError("Both extensions and is_valid_file cannot be None or not None at the same time")

        if extensions is not None:

            def is_valid_file(x: str) -> bool:
                return datasets.folder.has_file_allowed_extension(x, extensions)  # type: ignore[arg-type]

        is_valid_file = cast(Callable[[str], bool], is_valid_file)

        instances = []
        available_classes = set()
        for target_class in sorted(class_to_idx.keys()):
            class_index = class_to_idx[target_class]
            target_dir = os.path.join(directory, target_class)
            if not os.path.isdir(target_dir):
                continue
            for root, _, fnames in sorted(os.walk(target_dir, followlinks=True)):
                num_images=len(fnames)
                num_separator=math.ceil(num_images*0.9)
                i=0
                #print(i, num_separator)
                #print(setname=='train' and i>=0 and i<num_separator)
                for fname in sorted(fnames):
                    path = os.path.join(root, fname)
                    #print(i)
                    if(setname=='train' and i>=0 and i<num_separator or (setname=='val' and i>=num_separator and i<num_images)):
                        if is_valid_file(path):
                            item = path, class_index
                            instances.append(item)
                            if target_class not in available_classes:
                                available_classes.add(target_class)
                    i=i+1
    
        empty_classes = set(class_to_idx.keys()) - available_classes
        if empty_classes:
            msg = f"Found no valid file for the classes {', '.join(sorted(empty_classes))}. "
            if extensions is not None:
                msg += f"Supported extensions are: {extensions if isinstance(extensions, str) else ', '.join(extensions)}"
            raise FileNotFoundError(msg)
    
        return instances

In [6]:
#Método para entrenar el modelo
def train_model(model, criterion, optimizer, dataloaders):
    n_epochs = 5
    
    
    for epoch in range(n_epochs):
        train_loss = 0.0
        for batch in dataloaders:
            x,y = batch
            x, y = x.to(device), y.to(device)
            pred = model(x)
            loss = criterion(pred,y)/len(x)
            del pred,x,y
            train_loss += float(loss.item())
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
        print(f"Epoch {epoch +1}/{n_epochs} loss: {train_loss:.2f}")

In [7]:
def train_model2(model, criterion, optimizer, scheduler, dataloaders, dataset_sizes, num_epochs=5):
    since = time.time()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    
    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print("-"*10)
        
        for phase in ['train', 'val']: 
            if phase == 'train':
                model.train() # model to training mode
            else:
                model.eval() # model to evaluate
            
            running_loss = 0.0
            running_corrects = 0.0
            
            for inputs, labels in tqdm(dataloaders[phase]):
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                optimizer.zero_grad()
                
                with torch.set_grad_enabled(phase == 'train'): # no autograd makes validation go faster
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1) # used for accuracy
                    loss = criterion(outputs, labels)
                    
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
                
            if phase == 'train':
                scheduler.step() # step at end of epoch
            
            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc =  running_corrects.double() / dataset_sizes[phase]
            
            print("{} Loss: {:.4f} Acc: {:.4f}".format(phase, epoch_loss, epoch_acc))
            
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict()) # keep the best validation accuracy model
        print()

    time_elapsed = time.time() - since # slight error
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print("Best Val Acc: {:.4f}".format(best_acc))
    
    model.load_state_dict(best_model_wts)
    return model

In [8]:

 #train
transform_train = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomApply(torch.nn.ModuleList([transforms.ColorJitter()]), p=0.25),
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)), # imagenet means
    transforms.RandomErasing(p=0.2, value='random')
])
transform_val = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
])
    
train_data = ImageFolderCustom(root='../input/iais22-birds/birds/birds',setname='train', transform = transform_train)
train_loader = DataLoader(train_data, batch_size=256, shuffle=True, num_workers=4)
train_data_len = len(train_data)

val_data = ImageFolderCustom(root='../input/iais22-birds/birds/birds',setname='val', transform = transform_val)
val_loader = DataLoader(val_data, batch_size=64, shuffle=True, num_workers=4)
valid_data_len = len(val_data)

print(f"Found {len(train_data)} images for training with {len(train_data.classes)} classes")
print(f"Found {len(val_data)} images for validation with {len(val_data.classes)} classes")

final_train_data = datasets.ImageFolder(root='../input/iais22-birds/birds/birds', transform = transforms.ToTensor())
final_train_loader = DataLoader(final_train_data, batch_size=256, shuffle=True, num_workers=4)
final_train_data_len = len(final_train_data)

print(f"Found {len(final_train_data)} images for final training with {len(final_train_data.classes)} classes")

  cpuset_checked))


Found 52714 images for training with 400 classes
Found 5674 images for validation with 400 classes
Found 58388 images for final training with 400 classes


In [9]:
dataloaders = {
    "train": train_loader,
    "val": val_loader
}
dataset_sizes = {
    "train": train_data_len,
    "val": valid_data_len
}

final_dataloaders = {
    "train": final_train_loader,
    "val": val_loader
}
final_dataset_sizes = {
    "train": final_train_data_len,
    "val": valid_data_len
}

In [10]:
classes = get_classes("../input/iais22-birds/birds/birds")

In [11]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

torch.backends.cudnn.benchmark = True

#model = models.efficientnet_b1(pretrained=True)
#for param in model.parameters():
#    param.requires_grad = False
#n_inputs = model.classifier[1].in_features
#model.classifier = MyViT((3, 224, 224),n_patches=7, hidden_d=4, n_heads=2, device=device)
model = MyViT((3, 224, 224),n_patches=28, hidden_d=16, n_heads=4, device=device)
model = model.to(device)
#print(model.classifier)


In [12]:
criterion = nn.CrossEntropyLoss(label_smoothing=0.11).to(device)
optimizer = optim.AdamW(model.parameters(), lr=0.01)
exp_lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.97)

In [None]:

#model_ft = train_model(model, criterion, optimizer, train_loader)
model_ft = train_model2(model, criterion, optimizer, exp_lr_scheduler, dataloaders, dataset_sizes, num_epochs=7)

Epoch 0/6
----------


 39%|███▉      | 80/206 [02:05<02:59,  1.42s/it]

In [None]:
image_list = []
preds_id = []
for filename in glob.glob("../input/iais22-birds/submission_test/submission_test/*.jpg"): 
    im=Image.open(filename)
    id = os.path.basename(filename).split(".")[0]
    image_list.append(im)
    preds_id.append(id)

index=[]
preds = []
for f in image_list:
    i = image_list.index(f)+1
    predict_class = pre_image(f,model)
    index.append(i)
    preds.append(predict_class)
    if(i%500==0):
        print(i)

submission = pd.DataFrame(
    data =np.array([preds_id,preds ]).T, 
    columns = ["Id", "Category"]
)
submission.to_csv("submission.csv", index = False)
submission.head()