In [1]:
package_path = "../input/timm-pytorch-image-models/pytorch-image-models-master"
import sys
sys.path.append(package_path)

In [2]:
import os
import shutil
import numpy as np
import pandas as pd
import torch
import torchvision
from torch import nn

In [3]:
from torch.utils.data import Dataset, DataLoader
import albumentations as alb
from albumentations.pytorch import ToTensorV2
import pytorch_lightning as pl
from pytorch_lightning.metrics.functional import accuracy

In [4]:
test_dir = "../input/cassava-leaf-disease-classification/test_images"

In [5]:
test = pd.read_csv("../input/cassava-leaf-disease-classification/sample_submission.csv")
test.head()

Unnamed: 0,image_id,label
0,2216849948.jpg,4


In [6]:
test['image_id'] = test['image_id'].apply(lambda x: os.path.join(test_dir,x))
test.head()

Unnamed: 0,image_id,label
0,../input/cassava-leaf-disease-classification/t...,4


In [7]:
mean=np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])
test_transform= alb.Compose(
    [
        alb.SmallestMaxSize(max_size=520),
        alb.ShiftScaleRotate(p=0.5),
        alb.CenterCrop(height=512, width=512),
        alb.Flip(p=0.5),
        alb.CLAHE(p=0.5),
        alb.IAASharpen(p=0.5),
        alb.RandomBrightnessContrast(p=0.5),
        #alb.HueSaturationValue(p=0.5),
        alb.Normalize(mean=mean, std=std),
        ToTensorV2(),
    ]

)

In [8]:
import cv2

class CassavaImageDataset(Dataset):
    def __init__(self, data, transform=None):
        super().__init__()
        self.data = data.values
        self.transform = transform
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        image_id, label = self.data[index]
        image = cv2.imread(image_id)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        if self.transform is not None:
            image = self.transform(image=image)['image']
        return image, label

# Model-1 RESNEXT Inference

In [9]:
class CassavaResNextModel(pl.LightningModule):
    def __init__(self, data_dir='./train_images', learning_rate = 1e-4):
        super(CassavaResNextModel, self).__init__()
        self.data_dir = data_dir
        self.learning_rate = learning_rate
        self.training_loss=[]
        self.training_acc=[]
        self.valid_loss =[]
        self.valid_acc =[]
                
        self.num_classes = 5
        self.model = timm.create_model('swsl_resnext50_32x4d', pretrained=True)
        self.model.fc = nn.Sequential(nn.Linear(2048,256),
                            nn.BatchNorm1d(256),
                            nn.ReLU(),
                            nn.Dropout(0.2),
                            nn.Linear(256,5))
    def forward(self,x):
        x = self.model(x)
        return x

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr = self.learning_rate)
        return {
            'optimizer':  optimizer,
            'lr_scheduler': torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer,T_0=10,                                                                                  eta_min=1e-6),
            'monitor': 'val_loss'
        }
    #torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min')

    def step(self, batch):
        x, y = batch
        logits = self(x)
        loss = F.cross_entropy(logits, y)
        y_hat = torch.argmax(logits, dim=1)
        return loss, y, y_hat
    
    def training_step(self, batch, batch_idx):
        loss, y, y_hat = self.step(batch)
        return {'loss':loss, 'y':y, 'y_hat':y_hat}
          
    def validation_step(self, batch, batch_idx):
        loss, y, y_hat = self.step(batch)
        return {'loss':loss, 'y':y, 'y_hat':y_hat}
    
    def training_epoch_end(self, outputs):
        avg_loss = torch.stack([x['loss'] for x in outputs]).mean()
        auc = self.get_auc(outputs)
        #print("\n*********************************************************")
        print(f"Epoch: {self.current_epoch}   Train_Loss: {avg_loss:.4f} |   Train_acc: {auc:.4f}")
        #print("*********************************************************")
        self.training_loss.append(avg_loss)
        self.training_acc.append(auc)                                                                
        # self.log('train_loss', avg_loss)
        return None

    def validation_epoch_end(self, outputs):
        avg_loss = torch.stack([x['loss'] for x in outputs]).mean()
        auc = self.get_auc(outputs)
        print("\n*****************************************************")
        print(f"Epoch: {self.current_epoch}   Val_Loss: {avg_loss:.4f}   |   Val_acc: {auc:.4f}")
        #print("*****************************************************")
        self.log('val_loss', avg_loss)
        self.valid_loss.append(avg_loss)
        self.valid_acc.append(auc)
        return {'loss': avg_loss}

    def test_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        #y_hat = torch.argmax(logits, dim=1)
        y_hat = logits
        return {'y_hat':y_hat.cpu().numpy()}
    
    def test_end(self, outputs):
        y_hat = torch.cat([x['y_hat'] for x in outputs])
        return y_hat.cpu().numpy()


    def get_auc(self, outputs):
        y = torch.cat([x['y'] for x in outputs])
        y_hat = torch.cat([x['y_hat'] for x in outputs])
        auc = accuracy(y, y_hat)
        return auc 

    def history(self):
        return self.training_loss, self.training_acc, self.valid_loss, self.valid_acc

In [10]:
test_data = CassavaImageDataset(test, test_transform)
test_loader = DataLoader(dataset=test_data, batch_size=32, shuffle=False, pin_memory=True)

In [11]:
model = torch.load("../input/cass-resnext-512-model/cassava_model_resnext.pt")
model.eval()
trainer= pl.Trainer(gpus=1)

GPU available: True, used: True
TPU available: False, using: 0 TPU cores
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


### Inference (TTA- test time augmentation)


In [12]:
resnext_pred=None
for j in range(5):
    y_val = trainer.test(model,test_dataloaders=test_loader, verbose=False)
    pred = np.concatenate([x['y_hat'] for x in y_val])
    temp_pred=pred
    if resnext_pred is None:
        resnext_pred = temp_pred
    else:
        resnext_pred += temp_pred
resnext_pred /=5

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Testing', layout=Layout(flex='2'), max=…




HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Testing', layout=Layout(flex='2'), max=…




HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Testing', layout=Layout(flex='2'), max=…




HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Testing', layout=Layout(flex='2'), max=…




HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Testing', layout=Layout(flex='2'), max=…




# Model-2 Efficient Net Inference

In [13]:
test_transform= alb.Compose(
    [
        alb.SmallestMaxSize(max_size=350),
        alb.ShiftScaleRotate(p=0.5),
        alb.CenterCrop(height=300, width=300),
        alb.Flip(p=0.5),
        alb.CLAHE(p=0.5),
        alb.IAASharpen(p=0.5),
        alb.RandomBrightnessContrast(p=0.5),
        #alb.HueSaturationValue(p=0.5),
        alb.Normalize(mean=mean, std=std),
        ToTensorV2(),
    ]

)

In [14]:
class CassavaModel(pl.LightningModule):
    def __init__(self, data_dir='./train_images', learning_rate = 1e-4):
        super(CassavaModel, self).__init__()
        self.data_dir = data_dir
        self.learning_rate = learning_rate
        self.training_loss=[]
        self.training_acc=[]
        self.valid_loss =[]
        self.valid_acc =[]
                
        self.num_classes = 5
        self.model = timm.create_model('tf_efficientnet_b3_ns', pretrained=True)
        self.model.classifier = nn.Sequential(nn.Linear(1536,256),
                            nn.BatchNorm1d(256),
                            nn.ReLU(),
                            nn.Dropout(0.2),
                            nn.Linear(256,5))

    def forward(self,x):
        x = self.model(x)
        return x

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr = self.learning_rate, weight_decay=1e-6)
        return {
            'optimizer':  optimizer,
            'lr_scheduler': torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=10,eta_min=1e-6),
            'monitor': 'val_loss'
        }
    
    #torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min')
    
    def step(self, batch):
        x, y = batch
        logits = self(x)
        loss = F.cross_entropy(logits, y)
        y_hat = torch.argmax(logits, dim=1)
        return loss, y, y_hat
    
    def training_step(self, batch, batch_idx):
        loss, y, y_hat = self.step(batch)
        return {'loss':loss, 'y':y, 'y_hat':y_hat}
          
    def validation_step(self, batch, batch_idx):
        loss, y, y_hat = self.step(batch)
        return {'loss':loss, 'y':y, 'y_hat':y_hat}
    
    def training_epoch_end(self, outputs):
        avg_loss = torch.stack([x['loss'] for x in outputs]).mean()
        auc = self.get_auc(outputs)
        #print("\n*********************************************************")
        print(f"Epoch: {self.current_epoch}   Train_Loss: {avg_loss:.4f} |   Train_acc: {auc:.4f}")
        #print("*********************************************************")
        self.training_loss.append(avg_loss)
        self.training_acc.append(auc)                                                                
        # self.log('train_loss', avg_loss)
        return None

    def validation_epoch_end(self, outputs):
        avg_loss = torch.stack([x['loss'] for x in outputs]).mean()
        auc = self.get_auc(outputs)
        print("\n*****************************************************")
        print(f"Epoch: {self.current_epoch}   Val_Loss: {avg_loss:.4f} |   Val_acc: {auc:.4f}")
        #print("*****************************************************")
        self.log('val_loss', avg_loss)
        self.valid_loss.append(avg_loss)
        self.valid_acc.append(auc)
        return {'loss': avg_loss}

    def test_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        #y_hat = torch.argmax(logits, dim=1)
        y_hat = logits
        return {'y_hat':y_hat.cpu().numpy()}
    
    def test_end(self, outputs):
        y_hat = torch.cat([x['y_hat'] for x in outputs])
        return y_hat.cpu().numpy()


    def get_auc(self, outputs):
        y = torch.cat([x['y'] for x in outputs])
        y_hat = torch.cat([x['y_hat'] for x in outputs])
        auc = accuracy(y, y_hat)
        return auc 

    def history(self):
        return self.training_loss, self.training_acc, self.valid_loss, self.valid_acc

In [15]:
test_data = CassavaImageDataset(test, test_transform)
test_loader = DataLoader(dataset=test_data, batch_size=32, shuffle=False, pin_memory=True)

In [16]:
model = torch.load("../input/cass-effnetb3-model/cassava_model_effnet_b3ns.pt")
model.eval()
trainer= pl.Trainer(gpus=1)

GPU available: True, used: True
TPU available: False, using: 0 TPU cores
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


### Inference (TTA- test time augmentation)

In [17]:
effnet_pred=None
for j in range(5):
    y_val = trainer.test(model,test_dataloaders=test_loader, verbose=False)
    pred = np.concatenate([x['y_hat'] for x in y_val])
    temp_pred=pred
    if effnet_pred is None:
        effnet_pred = temp_pred
    else:
        effnet_pred += temp_pred
effnet_pred /=5

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Testing', layout=Layout(flex='2'), max=…




HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Testing', layout=Layout(flex='2'), max=…




HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Testing', layout=Layout(flex='2'), max=…




HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Testing', layout=Layout(flex='2'), max=…




HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Testing', layout=Layout(flex='2'), max=…




# Model-3 RegNet Inference

In [18]:
test_transform= alb.Compose(
    [
        alb.SmallestMaxSize(max_size=350),
        alb.ShiftScaleRotate(p=0.5),
        alb.CenterCrop(height=300, width=300),
        alb.Flip(p=0.5),
        alb.CLAHE(p=0.5),
        alb.IAASharpen(p=0.5),
        alb.RandomBrightnessContrast(p=0.5),
        #alb.HueSaturationValue(p=0.5),
        alb.Normalize(mean=mean, std=std),
        ToTensorV2(),
    ]

)

In [19]:
class CassavaRegNetModel(pl.LightningModule):
    def __init__(self, data_dir='./train_images', learning_rate = 1e-4):
        super(CassavaRegNetModel, self).__init__()
        self.data_dir = data_dir
        self.learning_rate = learning_rate
        self.training_loss=[]
        self.training_acc=[]
        self.valid_loss =[]
        self.valid_acc =[]
                
        self.num_classes = 5
        self.model = timm.create_model('regnety_032', pretrained=True)
        self.model.head.fc = nn.Sequential(nn.Linear(1512,256),
                            nn.BatchNorm1d(256),
                            nn.ReLU(),
                            nn.Dropout(0.2),
                            nn.Linear(256,5))
    def forward(self,x):
        x = self.model(x)
        return x

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr = self.learning_rate)
        return {
            'optimizer':  optimizer,
            'lr_scheduler': torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer,T_0=10,                                                                                  eta_min=1e-6),
            'monitor': 'val_loss'
        }
    #torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min')

    def step(self, batch):
        x, y = batch
        logits = self(x)
        loss = F.cross_entropy(logits, y)
        y_hat = torch.argmax(logits, dim=1)
        return loss, y, y_hat
    
    def training_step(self, batch, batch_idx):
        loss, y, y_hat = self.step(batch)
        return {'loss':loss, 'y':y, 'y_hat':y_hat}
          
    def validation_step(self, batch, batch_idx):
        loss, y, y_hat = self.step(batch)
        return {'loss':loss, 'y':y, 'y_hat':y_hat}
    
    def training_epoch_end(self, outputs):
        avg_loss = torch.stack([x['loss'] for x in outputs]).mean()
        auc = self.get_auc(outputs)
        #print("\n*********************************************************")
        print(f"Epoch: {self.current_epoch}   Train_Loss: {avg_loss:.4f} |   Train_acc: {auc:.4f}")
        #print("*********************************************************")
        self.training_loss.append(avg_loss)
        self.training_acc.append(auc)                                                                
        # self.log('train_loss', avg_loss)
        return None

    def validation_epoch_end(self, outputs):
        avg_loss = torch.stack([x['loss'] for x in outputs]).mean()
        auc = self.get_auc(outputs)
        print("\n*****************************************************")
        print(f"Epoch: {self.current_epoch}   Val_Loss: {avg_loss:.4f}   |   Val_acc: {auc:.4f}")
        #print("*****************************************************")
        self.log('val_loss', avg_loss)
        self.valid_loss.append(avg_loss)
        self.valid_acc.append(auc)
        return {'loss': avg_loss}

    def test_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        #y_hat = torch.argmax(logits, dim=1)
        y_hat = logits
        return {'y_hat':y_hat.cpu().numpy()}
    
    def test_end(self, outputs):
        y_hat = torch.cat([x['y_hat'] for x in outputs])
        return y_hat.cpu().numpy()


    def get_auc(self, outputs):
        y = torch.cat([x['y'] for x in outputs])
        y_hat = torch.cat([x['y_hat'] for x in outputs])
        auc = accuracy(y, y_hat)
        return auc 

    def history(self):
        return self.training_loss, self.training_acc, self.valid_loss, self.valid_acc

In [20]:
test_data = CassavaImageDataset(test, test_transform)
test_loader = DataLoader(dataset=test_data, batch_size=32, shuffle=False, pin_memory=True)

In [21]:
model = torch.load("../input/cass-regnet-model/cassava_model_regnet.pt")
model.eval()
trainer= pl.Trainer(gpus=1)

GPU available: True, used: True
TPU available: False, using: 0 TPU cores
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


### Inference (TTA- test time augmentation)

In [22]:
regnet_pred=None
for j in range(5):
    y_val = trainer.test(model,test_dataloaders=test_loader, verbose=False)
    pred = np.concatenate([x['y_hat'] for x in y_val])
    temp_pred=pred
    if regnet_pred is None:
        regnet_pred = temp_pred
    else:
        regnet_pred += temp_pred
regnet_pred /=5

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Testing', layout=Layout(flex='2'), max=…




HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Testing', layout=Layout(flex='2'), max=…




HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Testing', layout=Layout(flex='2'), max=…




HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Testing', layout=Layout(flex='2'), max=…




HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Testing', layout=Layout(flex='2'), max=…




## Ensemble of Model 1,2,3

In [23]:
final_pred = 0.45*resnext_pred + 0.35*effnet_pred + 0.2*regnet_pred

In [24]:
test['label'] = np.argmax(final_pred, axis=1)
test['image_id'] = test['image_id'].apply(lambda x: os.path.basename(x))
test

Unnamed: 0,image_id,label
0,2216849948.jpg,4


In [25]:
test.to_csv('./submission.csv', index=False)

In [26]:
shutil.rmtree("./lightning_logs")