# Blood Cell Classification

In [None]:
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import cv2, os
from PIL import Image
from tqdm import tqdm 

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.utils import shuffle
from sklearn import decomposition
from scipy.spatial import distance as dist

import albumentations
import pretrainedmodels


import torch
import torchvision
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import ReduceLROnPlateau
#from pytorchtools import EarlyStopping
#from pytorchtools import EarlyStopping
#from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from keras.callbacks import ModelCheckpoint
from torchvision import transforms, models
import torch.nn.functional as F
from torchvision import datasets
import torch.optim as optim

import warnings
warnings.filterwarnings("ignore")

### Visualization of DataClass

In [None]:
train_dataset_path = "/kaggle/input/blood-cells/dataset2-master/dataset2-master/images/TEST" 

CATEGORIES    = ['EOSINOPHIL','LYMPHOCYTE','MONOCYTE','NEUTROPHIL']

for k in range(3):
    i=0
    plt.figure(figsize=(25,15))
    for category in CATEGORIES:
        plt.subplot(5, 5, i+1)
        plt.yticks([])
        plt.xticks([])
        path=train_dataset_path + '/' + category
        image_p=os.listdir(path)
        plt.title(category , color='tomato').set_size(15)
        plt.axis('off')
        image = cv2.imread(os.path.join(path, image_p[k])) 
        image = image[:, :, [2, 1, 0]] 
        plt.imshow(image)
        i+=1

### Data Loading

In [None]:
DATADIR = ['/kaggle/input/blood-cells/dataset2-master/dataset2-master/images/TRAIN', '/kaggle/input/blood-cells/dataset2-master/dataset2-master/images/TEST']
CATEGORIES = ['EOSINOPHIL','LYMPHOCYTE', 'MONOCYTE', 'NEUTROPHIL']

In [None]:
image_label = []

def create_training_data():
    for datadir in DATADIR:
        for categories in CATEGORIES:
            path = os.path.join(datadir, categories)
            n_class = CATEGORIES.index(categories)
            for images in os.listdir(path):
                try:
                    image_path = os.path.join(path, images)

                    image_label.append([image_path, n_class])
                except Exception as e:
                    pass

create_training_data()


In [None]:
df = pd.DataFrame(image_label, columns=['image_name', 'label'])
df = df.sample(frac=1).reset_index(drop=True)
df.head()

### Some Visualization

In [None]:
count = df.label.value_counts()
sns.barplot(x=count.index, y=count.values)

### Dividing Dataset

In [None]:
Xtrain, xvalid, Ytrain, yvalid = train_test_split(df.image_name, df.label, test_size=.20)
print(Xtrain.shape, xvalid.shape, Ytrain.shape, yvalid.shape)

### Data Loading

In [None]:
class Custom_Dataset:
    def __init__(self, images, targets, train_data=False):
        self.features = images
        self.targets = targets

        if train_data:
            self.aug = albumentations.Compose([
                                albumentations.Resize(128, 128, always_apply=True),
                                albumentations.ShiftScaleRotate(shift_limit=0.0625,
                                                                scale_limit=0.1,
                                                                rotate_limit=5,
                                                                p=0.9),
                                #albumentations.RandomBrightnessContrast(always_apply=False),
                                albumentations.RandomRotate90(always_apply=False),
                                albumentations.HorizontalFlip(),
                                albumentations.VerticalFlip(),
                                albumentations.Normalize(mean=(0.485, 0.456, 0.406), 
                                                         std=(0.229, 0.224, 0.225), 
                                                         always_apply=True)              
                                                ])

        else:
            self.aug = albumentations.Compose([
                                albumentations.Resize(128, 128, always_apply=True),
                                albumentations.Normalize(mean=(0.485, 0.456, 0.406), 
                                                         std=(0.229, 0.224, 0.225),
                                                         always_apply=True) 
                                ])                       
            

    def __len__(self):
        return len(self.targets) 


    def __getitem__(self, idx):
        image = self.features[idx]
        image = cv2.imread(image)
        image = cv2.resize(image, (128,128)).astype(float)
        #image = image.reshape(128, 128, 3).astype(float)
        #image = Image.fromarray(image).convert("RGB")
        image = self.aug(image=np.array(image))['image']
        image = np.transpose(image, (2, 0, 1)).astype(np.float) 

        return {
            'image': torch.tensor(image, dtype=torch.float),
            'label': torch.tensor(self.targets[idx], dtype=torch.long)
        }

In [None]:
train_feature_label = Custom_Dataset(
                                    images=Xtrain.values, 
                                    targets=Ytrain.values,
                                    train_data=True
                                    )
valid_feature_label = Custom_Dataset(
                                    images=xvalid.values,
                                    targets=yvalid.values, 
                                    train_data=False
                                    )


In [None]:
train_loader = DataLoader(
                        dataset = train_feature_label,
                        batch_size = 32,
                        shuffle = True, 
                        num_workers = 4
                        )     
valid_loader = DataLoader(
                        dataset = valid_feature_label,
                        batch_size = 32,
                        shuffle = False
                        ) 


### Model

In [None]:
class Resnet34(nn.Module):
    
    def __init__(self):
        super(Resnet34, self).__init__()
        self.model = pretrainedmodels.__dict__['resnet34'](pretrained='imagenet')  
        self.l0 = nn.Linear(512, 4)

    def forward(self, x):
        bs, c, h, w = x.shape
        x = self.model.features(x) 
        x = F.adaptive_avg_pool2d(x, 1).reshape(bs, -1)
        op_layer_one = self.l0(x)
        return op_layer_one


In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = Resnet34()
model.to(device) 

### Optimizer, Criterion, Optimizer

In [None]:
optimizer = optim.Adam(model.parameters(), lr=0.001) 
criterion = nn.CrossEntropyLoss()
scheduler = ReduceLROnPlateau(optimizer, mode='max', patience=5, factor=0.3, verbose=True) 

In [None]:
def train(Xtrain, data_loader, model, device, optimizer, criterion, scheduler):
    model.train()
    
    total = 0
    train_loss = 0
    correct = 0


    for bi, data in tqdm(enumerate(data_loader), total=int(len(Xtrain)/data_loader.batch_size)):
        image = data['image']
        grapheme_root = data['label']
        
        image = image.to(device, dtype=torch.float)
        targets = grapheme_root.to(device, dtype=torch.long) 
        

        optimizer.zero_grad()
        outputs = model(image)
        loss = criterion(outputs, targets)
        
        #model.cleargrads()
        loss.backward()
        optimizer.step()
        

        ##########
        #outputs = torch.sigmoid(outputs)
        #outputs[outputs >= 0.5] = 1
        #accuracy = accuracy_score(targets, outputs.detach().numpy()) 
        #acc += accuracy
        ##########
        
        train_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
        

        
    train_acc = correct / total
    #train_accuracy.append(train_acc) 

    train_loss = train_loss/total
    #train_losses.append(train_loss)

    print("Epoch: {}  \tTraining Acc: {:.6f}  \tTraining Loss: {:.6f}".format(epoch+1, train_acc, train_loss)) 
    return train_acc, train_loss




def evaluation(xvalid, data_loader, model, device, criterion):
    model.eval()
    
    total = 0
    valid_loss = 0
    correct = 0


    for bi, data in tqdm(enumerate(data_loader), total=int(len(xvalid)/ data_loader.batch_size)):
        image = data['image']
        grapheme_root = data['label']

        image = image.to(device, dtype=torch.float)
        targets = grapheme_root.to(device, dtype=torch.long)

        outputs = model(image)
        loss = criterion(outputs, targets)

        ###############
        #outputs = torch.sigmoid(outputs)
        #outputs[outputs >= 0.5] = 1   
        #accuracy = accuracy_score(targets, outputs.detach().numpy())
        #acc += accuracy
        ###############
        
        #acc += (outputs == targets).float().sum()   
        #valid_loss += loss.item() * image.size(0)
        
        
        valid_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
        
        

    valid_acc = correct/ total
    #valid_accuracy.append(valid_acc)

    valid_loss = valid_loss / total
    #valid_losses.append(valid_loss)


    print("Epoch: {} \tValidation Acc: {:.6f}  \tValidation Loss: {:.6f}".format(epoch+1, valid_acc, valid_loss))
    return valid_acc, valid_loss

### Model Training

In [None]:
# Training the model
train_accuracy = []
train_losses = []
valid_accuracy = []
valid_losses = []

for epoch in range(10):
    train_acc, train_loss = train(Xtrain, train_loader, model, device, optimizer, criterion, scheduler)
    valid_acc, valid_loss = evaluation(xvalid, valid_loader, model, device, criterion)
    train_accuracy.append(train_acc)
    train_losses.append(train_loss)
    valid_accuracy.append(valid_acc)
    valid_losses.append(valid_loss) 

    #early_stopping(valid_loss, model) 
    #if early_stopping.early_stop:
        #break 

print("Final train accuracy is :", np.mean(train_accuracy))
print("Final train loss is :", np.mean(train_losses))
print("Final valid accuracy is :", np.mean(valid_accuracy))
print("Final valid loss is :", np.mean(valid_losses))


### Saving Model

In [None]:
torch.save(model.state_dict(), 'Blood_Classification.pth')

### Test Data Preprocessing 

In [None]:
DATADIR = "/kaggle/input/blood-cells/dataset2-master/dataset2-master/images/TEST_SIMPLE"
CATEGORIES = ['EOSINOPHIL','LYMPHOCYTE', 'MONOCYTE', 'NEUTROPHIL']

test_image_label = []


def create_test_data():
    for categories in CATEGORIES:
        path = os.path.join(DATADIR, categories)
        n_class = CATEGORIES.index(categories)
        for images in os.listdir(path):
            try:
                image_path = os.path.join(path, images)

                test_image_label.append([image_path, n_class])
            except Exception as e:
                pass

create_test_data()


In [None]:
test_df = pd.DataFrame(test_image_label, columns=['image_name', 'label'])
test_df = test_df.sample(frac=1).reset_index(drop=True)
test_df.head()

### Test Data Loading

In [None]:
test_feature_label = Custom_Dataset(
                                    images=test_df.image_name.values,
                                    targets=test_df.label.values,
                                    train_data=False
                                    )

In [None]:
test_loader = DataLoader(
                        dataset = test_feature_label,
                        batch_size = 10,
                        shuffle = False
                        )

### Model Loading

In [None]:
# Loading Model
model_trained = Resnet34()
model_trained.load_state_dict(torch.load('./Blood_Classification.pth'))


In [None]:
model.eval()

total = 0
correct = 0

with torch.no_grad():
    for idx, data in enumerate(test_loader):
        images = data['image']
        labels = data['label']
        
        images = images.to(device, dtype=torch.float) 
        targets = labels.to(device, dtype=torch.long) 
        
        outputs = model_trained(images)
        loss = criterion(outputs, targets)
        
        test_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
        
    print("Test Accuracy of The Model is :", correct/total)
        