In [1]:
import os
from glob import glob
import numpy as np
import pandas as pd
from read_dataset import build_df
from utils import CFG2
from sklearn.model_selection import train_test_split
import torch
from torch.autograd import Variable
from models import *
from main import *
from tqdm import tqdm
from torch.utils.data import DataLoader
from datapreprocess import *

import seaborn as sns
from sklearn.metrics import *
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchsummary import summary
import torchvision


In [2]:
IMG_FILES = CFG2.img_path 
LABEL_FILES = CFG2.labels

In [3]:
def reader(IMG_FILES,LABEL_FILES):
    # x_train, x_valid, x_test, y_train, y_valid, y_test
    dir_names = ['train','valid','test']
    # images
    x_train, x_valid, x_test, y_train, y_valid, y_test=[],[],[],[],[],[]
    for dir in (os.listdir(IMG_FILES)):
        if dir in dir_names:
            for img_name in (os.listdir(os.path.join(IMG_FILES, dir))):
                img_path = os.path.join(IMG_FILES, dir, img_name)

                if dir  == 'train':
                    x_train.append(img_path)

                elif dir == 'valid':
                    x_valid.append(img_path)

                elif dir  == 'test':
                    x_test .append(img_path)
    
    for l_dir in (os.listdir(LABEL_FILES)):
        if l_dir in dir_names:
            for label in (os.listdir(os.path.join(LABEL_FILES, l_dir))):
                label_path = os.path.join(LABEL_FILES, l_dir,label)

                if l_dir  == 'train':
                    y_train.append(label_path)

                elif l_dir == 'valid':
                    y_valid.append(label_path)

                elif l_dir  == 'test':
                    y_test .append(label_path)

    train_data = [x_train,y_train]
    valid_data = [x_valid,y_valid]
    test_data = [x_test,y_test]
    return np.array(train_data), np.array(valid_data), np.array(test_data)

    # class labels


In [4]:
train_data, valid_data, test_data= reader(IMG_FILES, LABEL_FILES)


In [5]:
train_data[1][1]

'cell_datatset/labels/train/Opisthorchis viverrine_0938.txt'

In [6]:
class Custom_Dataset(torch.utils.data.Dataset):
    def __init__(self, data, transform=True,IMAGE_SIZE = 640):
        super(Custom_Dataset, self).__init__()
        # List of files
        self.IMAGE_SIZE = IMAGE_SIZE
        self.data_files = data[0]  # [DATA_FOLDER.format(id) for id in ids]
        self.label_files = data[1]  # [LABELS_FOLDER.format(id) for id in ids]
        self.class_labels = torch.LongTensor(self.get_class_labels()) 
        self.transform = transform
        # Sanity check : raise an error if some files do not exist
        for f in self.data_files:
            if not os.path.isfile(f):
                raise KeyError("{} is not a file !".format(f))

    def __len__(self):
        return len(self.data_files)  # the length of the used data
    
    def img_size(self,idx):
        self.img =np.asarray(io.imread(self.data_files[idx]))
        return (self.img).shape
    
    def get_class_labels(self):
        labels  = []
        for fname in self.label_files:
            with open(fname, "r") as file:
                labels.append(file.read().split(" ")[0])
        return np.array(labels,dtype=np.uint8)

    def original_img(self,idx):
        self.img =(
                   1/255
             *np.asarray(
                io.imread(self.data_files[idx], plugin="pil").transpose(
                    (2, 0, 1)
                ),
                dtype="float32",
            )
        )
        return self.img,self.labels[idx]

    def get_class_label(self,idx):
        return self.class_labels[idx]

    def resized_bbox(self,idx):
        image = Chitra(self.data_files[idx], self.labels[idx], self.class_labels[idx])
        # Chitra can rescale your bounding box automatically based on the new image size.
        image.resize_image_with_bbox((640, 640))

        return np.array([image.bboxes[0].x1_int,image.bboxes[0].y1_int,image.bboxes[0].x2_int,image.bboxes[0].y2_int])

    def __getitem__(self, idx):
        #         Pre-processing steps
        
        if self.transform is not None:

            self.data = (
                np.asarray(
                io.imread(self.data_files[idx], plugin="pil").transpose(
                    (2, 0, 1)
                ),
                dtype="float32",
            )
        )

            # convert the tensor to image
            tensor2image_transform = transforms.ToPILImage()

            # make the format avialble to resnet model
            data_p = torch.tensor(
                self.data 
            )  # tranform the image into ResNet format

            image = tensor2image_transform(data_p)
            data_p = self.transform(
                image
            )  # transform the image into ResNet format
            return data_p, self.class_labels[idx]


In [7]:
def get_default_device():
    """gets gpu for mac m1 or cuda, or cpu machine"""
    
    if torch.cuda.is_available():
        device = torch.device('cuda')
        print('Running on Cuda GPU')
        return device
        # x = torch.ones(1, device=mps_device)
        # print(x)
        
    
    elif torch.backends.mps.is_available():
        print('Running on the Mac GPU')
        mps_device = torch.device("mps")
        return mps_device
        
    else:
        # print("MPS device not found.")
        return torch.device('cpu')
        print('Code Running on a CPU')



def plot_model_history(training_accs, training_losses):# Get training and test loss histories

    # Create count of the number of epochs
    epoch_count = range(1, len(training_accs) + 1)

    # Visualize loss history
    plt.plot(epoch_count, training_losses, 'r--')
    plt.plot(epoch_count, training_accs, 'b-')
    plt.legend(['Training Loss', 'Training Accuracy'])
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy and Loss Curves')
    plt.show()


# testing/ validation of unet model
def validate_model(trained_model, test_dataloader, device=None):
    trained_model.eval() # switch off some layers such as dropout layer during validation
    with torch.no_grad():
        test_predictions = []
        test_labels = []

        for i, (x, y) in enumerate(test_dataloader):
            x  = x.to(device)
            y  = y.to(device)
            y_hat = trained_model(x)
            predicted_labels =  torch.argmax(y_hat, dim=1).detach().cpu().numpy().tolist()
            # y_hat  = y_hat.cpu()
            y = y.detach().cpu().numpy().tolist()
            test_predictions.extend(predicted_labels)
            test_labels.extend(y)
    
        return test_labels, test_predictions


def evalution_metrics(ground_truth, predictions):
    print(f"mean acc score = {accuracy_score(ground_truth, predictions)}")
    print(f"mean recall score = {recall_score(ground_truth, predictions, average='micro')}")
    print(f"precision score = {precision_score(ground_truth, predictions, average='micro')}")
    print(f"mean f1 score = {f1_score(ground_truth, predictions, average='micro')}")
    labels = np.unique(ground_truth).tolist()
    cm  = confusion_matrix(ground_truth, predictions, labels=labels) 
    report  = classification_report(ground_truth, predictions)
    print(report)

    sns.heatmap(cm)
    plt.show()

In [8]:
transform = preprocess_image()
train_dataset  = Custom_Dataset(train_data, transform=transform)
valid_dataset = Custom_Dataset(valid_data, transform=transform)
test_dataset = Custom_Dataset(test_data, transform=transform)

In [9]:
DEVICE = get_default_device()
loss_fn = nn.CrossEntropyLoss()
learning_rate = 0.001
EPOCHS = 20
# Parameters
params = {"batch_size": 4, "shuffle": True, "num_workers": 4}
NUM_CLASSES = 11

Running on Cuda GPU


In [10]:

# Dataloaders
train_dataloader = DataLoader(
    train_dataset, params["batch_size"], num_workers=params["num_workers"], shuffle=params["shuffle"],
)
validation_dataloader = torch.utils.data.DataLoader(valid_dataset, params['batch_size'],num_workers=params['num_workers'])
test_dataloader = DataLoader(
    test_dataset, params["batch_size"], num_workers=params["num_workers"]
)

In [11]:
class CNN(nn.Module):
    def __init__(self, input_channels=3, filters=32, num_classes=11):
        super(CNN, self).__init__()
        self.conv1  = nn.Conv2d(input_channels, filters, kernel_size=2) 
        self.conv2  = nn.Conv2d(filters, filters * 2, kernel_size=3)
        self.conv3 = nn.Conv2d(filters * 2, filters * 4, kernel_size=3)

        self.fc1  =  nn.Linear(in_features=  (filters * 4 * 28*28  + 1_000) , out_features=64)
        self.out = nn.Linear(64, out_features=num_classes)

        self.flatten = nn.Flatten(start_dim=1)

    def forward(self, x, x2):
        x  = self.conv1(x) # output = (B, C,224,224)
        x  = F.relu(x) 
        x  = F.max_pool2d(x, kernel_size=2, padding=1) #output = (B,C,112,112)
        x  = F.dropout(x, 0.2)

        x  = self.conv2(x) #output = (B,iC,112,112)
        x  = F.relu(x)
        x  = F.max_pool2d(x, kernel_size=2, padding=1) # #output = (B,iC,56,56)
        x  = F.dropout(x, 0.2)
        x  = self.conv3(x)
        x  = F.relu(x)
        x  = F.max_pool2d(x, kernel_size=2, padding=1) # output = (B, iC, 28 , 28)
        x  = F.dropout(x, 0.2)
        x  = self.flatten(x)
        # print("x2 size = ", x2.size())
        # print(
        #     'size of x  = ', x.size(),
        # )
        x  = torch.cat((x, x2), dim=1)
        # print("cat size = ", x.size())
        x  = self.fc1(x)
        x  = F.relu(x)
        x  = F.dropout(x, 0.25)
        x  = self.out(x)
        return x

In [12]:
def get_pretrained_model():
    
    resnet152 = torchvision.models.resnet152(pretrained=True)
    resnet152.eval()
    # remove the last layer
    # new_model = torch.nn.Sequential(*(list(resnet152.children())[:-1]))
    # freeze the model
    for param in resnet152.parameters():
        param.requires_grad = False

    # if device is not None:
    #     resnet152.to(device)

    return resnet152




def trainCustomModel(resnetModel,yoloDetectModel,customCNNModel, train_dataloader, optimizer,loss_fn,in_channels=3,num_classes=11, epochs=30, learning_rate=0.001, device='cpu'):
    """Accepts feature from resnet 
    and yolo object detection cropped iamge(s) 
    as features to train an accurate cnn classifier.
    """

    training_losses = []
    training_accs = []

    for epoch in range(1, epochs+1):
        number_of_batches = 0
        epoch_loss_values = 0.0
        epoch_accs = 0.0
        for index, train_batch in enumerate(tqdm(train_dataloader)):
            # X,  y = X.to(device), y.to(device)  # make the tensors in gpu state
            X   = train_batch[0]
            y = train_batch[1]
            X  = Variable(X, requires_grad=True).to(device)
            y  = torch.LongTensor(y).to(device)

            # X_valid  = Variable(valid_batch[0], requires_grad=False).to(device)
            # y_valid = torch.LongTensor(valid_batch[1]).to(device)

            # resnet processing
            # resnet_X = Variable(resnet_preprocess((X.cpu())), requires_grad=True).to(device)
            print('Before Resnt')
            resnet_X  = resnetModel(X)
            print('After Resnt')

            # extracted_features = resnetModel(resnet_X) # (Bacth, 1000) tensor is returned
            # X2  = Variable(torch.randn(size= (X.size()[0],1000)), requires_grad=True).to(device)
            preds = customCNNModel(X, resnet_X)
            print('After customCNNModel')
        

            loss = loss_fn(preds, y).to(device)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            probs = torch.log_softmax(preds, dim=1)
            predicted_labels = torch.argmax(probs, dim=1)

            # acc
            epoch_accs += accuracy_score(y.detach().cpu(),predicted_labels.detach().cpu())
            epoch_loss_values += loss.item()

            number_of_batches += 1

        # compute average of batch loss and accuracy
        batch_acc, batch_loss = epoch_accs / \
            number_of_batches, epoch_loss_values / number_of_batches
        training_losses.append(batch_loss)
        training_accs.append(batch_acc)

        print("Epoch:{}/{}, acc={:.3f}%, loss={:.3f}".format(epoch,epochs, batch_acc*100, batch_loss))

    print("Learning Finished!")

    return training_accs, training_losses

In [13]:
INPUT_CHANNELS  = 3
NUM_FEATURES  = 16
customCNNModel = CNN(INPUT_CHANNELS, NUM_FEATURES, NUM_CLASSES).to(DEVICE) 
optimizer = torch.optim.Adam(
        params=customCNNModel.parameters(), lr=learning_rate)

In [14]:
resnetModel  = get_pretrained_model().to(DEVICE)

In [15]:
training_accs, training_losses = trainCustomModel(resnetModel, None,customCNNModel, train_dataloader, optimizer,loss_fn, in_channels=3,num_classes=11, epochs=50, learning_rate=0.001, device=DEVICE)

100%|██████████| 1965/1965 [04:59<00:00,  6.57it/s]


Epoch:1/50, acc=8.982%, loss=2.412


 31%|███       | 605/1965 [01:31<03:25,  6.63it/s]


KeyboardInterrupt: 

In [None]:
plot_model_history(training_accs, training_losses)