## Classification TBC

In [None]:
import numpy as np
import torch
import pandas as pd
import matplotlib.pyplot as plt
import pickle
from PIL import Image
import torch.nn as nn
import torchvision.models
from torchsummary import summary
import timm
from efficientnet_pytorch import EfficientNet
from torch.utils.data import TensorDataset
from torch.utils.data import DataLoader
import time
from sklearn.metrics import confusion_matrix
import seaborn as sn
import torch.nn.functional as F
from sklearn.model_selection import train_test_split

In [None]:
args = {}
args['lr'] = [0.001 , 0.0001 , 0.00001]
args['batch']= [16, 32, 64]
args['optimizer'] = ['adam', 'rmsprop', 'sgdm', 'sgd']
args['epoch'] = 20
args['class'] = ['Bakteri TBC', 'non TBC']

In [None]:
def save_pickle(filename, data):
    with open(filename, 'wb') as file:
        pickle.dump(data, file)
    print(f'berhasil menyimpan pkl {filename}')

def write_pickle(filename):
    with open(filename, 'rb') as file:
        loaded_cache = pickle.load(file)
    print(f'Berhasil Load pkl {filename}')
    return loaded_cache

class efficientNetC(nn.Module):
    def __init__(self, version):
        super(efficientNetC, self).__init__()
        self.efficientnet = EfficientNet.from_pretrained('efficientnet-'+version)
        in_features = self.efficientnet._fc.in_features
        num_class = len(args['class'])
        self.efficientnet._fc = nn.Linear(in_features, num_class)
        self.version = version

    def forward(self, x):
        return self.efficientnet(x)

class SEBlock(nn.Module):
    def __init__(self, in_channels, reduction_ratio=16):
        super(SEBlock, self).__init__()
        self.squeeze = nn.AdaptiveAvgPool2d(1)
        self.excitation = nn.Sequential(
            nn.Linear(in_channels, in_channels // reduction_ratio),
            nn.ReLU(inplace=True),
            nn.Linear(in_channels // reduction_ratio, in_channels),
            nn.Sigmoid()
        )

    def forward(self, x):
        y = self.squeeze(x)
        y = y.view(y.size(0), -1)
        y = self.excitation(y).view(y.size(0), -1, 1, 1)
        return x * y

class SEResNetBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(SEResNetBlock, self).__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.se_block = SEBlock(out_channels)

        self.downsample = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        out = self.se_block(out)

        identity = self.downsample(identity)

        out += identity
        out = self.relu(out)

        return out

class SEResNet18(nn.Module):
    def __init__(self, num_classes=2):
        super(SEResNet18, self).__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(SEResNetBlock, 64, 2, stride=1)
        self.layer2 = self._make_layer(SEResNetBlock, 128, 2, stride=2)
        self.layer3 = self._make_layer(SEResNetBlock, 256, 2, stride=2)
        self.layer4 = self._make_layer(SEResNetBlock, 512, 2, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)

    def _make_layer(self, block, channels, blocks, stride=1):
        layers = []
        layers.append(block(self.in_channels, channels, stride))
        self.in_channels = channels
        for _ in range(1, blocks):
            layers.append(block(channels, channels, stride=1))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x

class GoogLeNet(nn.Module):
    def __init__(self, num_classes=2):
        super(GoogLeNet, self).__init__()
        self.model = torchvision.models.googlenet(pretrained=True)
        self.model.fc = nn.Linear(self.model.fc.in_features, num_classes)
    def forward(self, x):
        x = self.model(x)
        return x


class MobileNetV2(nn.Module):
    def __init__(self, num_classes=2):
        super(MobileNetV2, self).__init__()
        self.model = torchvision.models.mobilenet_v2(num_classes=2)

    def forward(self, x):
        x = self.model(x)
        return x

class ResNet152V2(nn.Module):
    def __init__(self, num_classes=2):
        super(ResNet152V2, self).__init__()
        self.model = torchvision.models.resnet152(weights='IMAGENET1K_V2')
        self.model.fc = nn.Linear(self.model.fc.in_features, num_classes)
    def forward(self, x):
        x = self.model(x)
        return x

class Classification:
    def __init__(self, str_model):
        self.modelname = str_model
        self.criterion = nn.CrossEntropyLoss()
        self.epochs = args['epoch']
        self.best_model = None

    def create_model(self, model_name):
        if model_name == 'efficientNet':
          return  efficientNetC("b0")
        elif model_name == 'se':
          return SEResNet18()
        elif model_name == 'googlenet':
          return GoogLeNet()
        elif model_name == 'mobilenetv2':
          return MobileNetV2()
        elif model_name == 'resnet152v2':
          return ResNet152V2()

    def compile(self, model, optim, lr):
        if optim == "rmsprop":
          self.model_optimizer = torch.optim.RMSprop(model.parameters(), lr=lr)
        elif optim == "adam":
          self.model_optimizer = torch.optim.Adam(model.parameters(), lr=lr)
        elif optim == "sgdm":
          self.model_optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.001)
        elif optim == "sgd":
          self.model_optimizer = torch.optim.SGD(model.parameters(), lr=lr)


    def preprocessing(self, x, y, xval, yval, batch):
        if self.Train == True:
          self.data_x_train, self.data_y_train = x.permute(0,3,1,2)/255.0, y
          self.data_x_val, self.data_y_val = xval.permute(0,3,1,2)/255.0, yval

          self.train_loader = DataLoader(TensorDataset(self.data_x_train, self.data_y_train), batch_size = batch, shuffle=True)
          self.val_loader = DataLoader(TensorDataset(self.data_x_val, self.data_y_val), batch_size = batch, shuffle=True)

        else:
          self.data_x_test, self.data_y_test = x.permute(0,3,1,2)/255.0, y
          self.test_loader = DataLoader(TensorDataset(self.data_x_test, self.data_y_test), batch_size = batch, shuffle=True)

    def save_model(self, model, filename):
        torch.save(model.state_dict(), args['model']+filename+".pth")

    def load_model(self, model, filename, device):
        return torch.load(args['model']+filename+".pth", map_location=device)

    def train(self, model, optimizer, criterion, train_loader):
        model.train()
        running_loss = 0
        correct = 0
        total = 0
        alpha, beta = 1e-5, 1e-3
        for id_batch, (images, labels) in enumerate(train_loader):
            images = images.to(self.device)
            labels = labels.to(self.device)

            optimizer.zero_grad()
            outputs = model(images)

            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predict = torch.max(outputs.data, 1)
            correct += (predict == labels).sum().item()
            total += labels.size(0)

        train_loss = running_loss / len(train_loader)
        train_acc = correct / total

        return model, train_loss, train_acc

    def valid(self, model, criterion, valid_loader):
        model.eval()
        running_loss = 0
        correct = 0
        total = 0
        all_prediction, all_target = [], []

        with torch.no_grad():

            for batch_idx, (images, labels) in enumerate(valid_loader):
                images = images.to(self.device)
                labels = labels.to(self.device)

                outputs = model(images)

                loss = criterion(outputs, labels)
                running_loss += loss.item()

                _, predict = torch.max(outputs.data, 1)
                correct += (predict == labels).sum().item()
                total += labels.size(0)


                all_prediction.extend(predict.cpu().numpy())
                all_target.extend(labels.cpu().numpy())

        val_loss = running_loss / len(valid_loader)
        val_acc = correct / total
        eval=self.evaluate(all_prediction, all_target)

        return model, val_loss, val_acc, eval

    def fit(self, X, y, xval, yval, device):
        self.Train = True
        self.device=device

        best_acc_tunning = 0
        ####tunning
        #lr, batch, optimizer
        for lr_use in args['lr']:
            for batch_use in args['batch']:
                for optimizer_use in args['optimizer']:

                    model_use = self.create_model(self.modelname)
                    model_use = model_use.to(self.device)
                    self.criterion = self.criterion.to(device)
                    model_stats = {}
                    loss_train, acc_train, loss_val, acc_val, perform=[], [], [], [], []
                    best_loss = 999
                    time_start = time.time()

                    proses_name = f"model_{self.modelname}_lr_{lr_use}_batch_{batch_use}_optimizer_{optimizer_use}"
                    print(f"Tunning {proses_name}")

                    self.compile(model_use, optimizer_use, lr_use)
                    self.preprocessing(X,y,xval,yval, batch_use)

                    for epoch in range(self.epochs):

                      model_use, train_loss, train_acc = self.train(model_use, self.model_optimizer, self.criterion, self.train_loader)
                      model_use, val_loss, val_acc, eval = self.valid(model_use, self.criterion, self.val_loader)

                      loss_train.append(train_loss)
                      acc_train.append(train_acc)
                      loss_val.append(val_loss)
                      acc_val.append(val_acc)
                      perform.append(eval)

                      #simpan model
                      print(f"Epoch : {epoch+1} train loss : {train_loss} train acc : {train_acc} val loss : {val_loss} val acc : {val_acc} ")
                      if train_loss < best_loss :
                          print("Menyimpan Model")
                          self.save_model(model_use, proses_name)
                          best_loss = train_loss
                    time_stop = time.time()

                    #simpan semua loss fold
                    model_stats['loss_training'], model_stats['acc_training']=loss_train, acc_train
                    model_stats['loss_validasi'], model_stats['acc_validasi']=loss_val, acc_val

                    df = pd.DataFrame(model_stats)
                    df.to_excel(args['performa_excel']+proses_name+".xlsx", index=False)

                    model_stats['timer']=(time_stop-time_start)
                    model_stats['perform']=perform
                    save_pickle(args['performa']+f"perform_{proses_name}.pkl", model_stats)
                    print(f"with time : {model_stats['timer']} loss : {loss_val[-1]} acc : {acc_val[-1]} precision : {model_stats['perform'][-1][1]} recall : {model_stats['perform'][-1][2]} f1 : {model_stats['perform'][-1][3]}")
                    if acc_val[-1] > best_acc_tunning:
                        best_acc_tunning = acc_val[-1]
                        self.best_model = proses_name
                    model_use=None
        print(f"FINAL BEST MODEL WITH ACC : {best_acc_tunning}")
        print(self.best_model)

    def best_model_graph(self):
      best_path = args['performa']+"perform_"+self.best_model+".pkl"
      best_perform = write_pickle(best_path)
      acc_train, acc_val, loss_train, loss_val, timer = best_perform['acc_training'], best_perform['acc_validasi'], best_perform['loss_training'], best_perform['loss_validasi'], best_perform['timer']

      print(f"Training Time for {timer}")
      plt.figure(figsize=(12, 5))
      plt.plot(loss_train, label='Training Loss')
      plt.plot(loss_val, label='Validasi Loss')
      plt.xlabel('Epoch')
      plt.ylabel('Loss')
      plt.title(f'loss {self.best_model}')
      plt.legend()
      plt.show()

      plt.figure(figsize=(12, 5))
      plt.plot(acc_train, label='Training Accuracy')
      plt.plot(acc_val, label='Validation Accuracu')
      plt.xlabel('Epoch')
      plt.ylabel('Accuracy')
      plt.title(f'Accuracy Of {self.best_model}')
      plt.legend()
      plt.show()

    def predict(self,  X,y, device):
      self.Train = False
      self.device = device
      model = self.create_model(self.modelname)
      # Split the string by underscores
      tokens = self.best_model.split('_')
      batch_index = tokens.index('batch')
      batch_size = int(tokens[batch_index + 1])

      model.load_state_dict(self.load_model(model, self.best_model, self.device) ,strict=False)
      self.preprocessing(X,y, None, None, batch_size)

      # testing phase
      _, test_loss, test_acc, perform = self.valid(model, self.criterion, self.test_loader)

      print(f"Hasil Testing")

      plt.figure(figsize=(6, 6))
      sn.heatmap(perform[4], annot=True, fmt='d', cmap='Blues', xticklabels=args['class'], yticklabels=args['class'])
      plt.xlabel('Predicted')
      plt.ylabel('Target')
      plt.title('Confusion Matrix')
      plt.show()

      print(f"Loss : {test_loss}")
      print(f"Accuracy : {test_acc}")
      print(f"Precision : {perform[1]}")
      print(f"Recall : {perform[2]}")
      print(f"F1-score : {perform[3]}")

    def evaluate(self, predict, target):
      conf_matrix = torch.from_numpy(confusion_matrix(target, predict))
      metrics_per_class = {}
      for i in range(conf_matrix.size(0)):
        TP = conf_matrix[i, i]
        FP = torch.sum(conf_matrix[:, i]) - TP
        FN = torch.sum(conf_matrix[i, :]) - TP
        TN = torch.sum(conf_matrix) - TP -FP -FN

        accuracy = (TP + TN) / torch.sum(conf_matrix)
        precision = TP / (TP + FP)
        recall = TP / (TP + FN)
        f1 = 2 * (precision * recall) / (precision + recall)

        metrics_per_class[args['class'][i]] = {
            'acc': accuracy.item(),
            'precision': precision.item(),
            'recall': recall.item(),
            'f1': f1.item()}
      accuracy_total = sum(metric['acc'] for metric in metrics_per_class.values()) / conf_matrix.size(0)
      precision_total = sum(metric['precision'] for metric in metrics_per_class.values()) / conf_matrix.size(0)
      recall_total = sum(metric['recall'] for metric in metrics_per_class.values()) / conf_matrix.size(0)
      f1_total = sum(metric['f1'] for metric in metrics_per_class.values()) / conf_matrix.size(0)
      return [accuracy_total, precision_total, recall_total, f1_total, conf_matrix, metrics_per_class]

In [None]:
xtrain, xtest, ytrain, ytest = train_test_split(dataset['images'], dataset['class'], test_size=0.2, random_state=42, stratify=dataset['class'])
print(f"xtrain : {xtrain.shape}")
print(f"xtest : {xtest.shape}")
print(f"ytrain : {ytrain.shape}")
print(f"ytest : {ytest.shape}")

xtrain : torch.Size([1012, 224, 224, 3])
xtest : torch.Size([254, 224, 224, 3])
ytrain : torch.Size([1012])
ytest : torch.Size([254])


In [None]:
xtrain_val, xval, ytrain_val, yval = train_test_split(xtrain, ytrain, test_size=0.2, random_state=42, stratify=ytrain)
print(f"xtrain_val : {xtrain_val.shape}")
print(f"xval : {xval.shape}")
print(f"ytrain_val : {ytrain_val.shape}")
print(f"yval : {yval.shape}")

xtrain_val : torch.Size([809, 224, 224, 3])
xval : torch.Size([203, 224, 224, 3])
ytrain_val : torch.Size([809])
yval : torch.Size([203])


## Training CNN
- EfficientNet
- Squeeze and Excitation
- GoogleNet
- ResNetV2
- ResNet152V2)

## A. EfficientNet

In [None]:
# model = efficientNetC("b0")
classifier = Classification("efficientNet")
classifier.fit(xtrain_val, ytrain_val, xval, yval, 'cuda')

In [None]:
classifier.best_model_graph()

In [None]:
classifier = Classification("efficientNet")
classifier.best_model = 'model_efficientNet_lr_0.001_batch_32_optimizer_adam'
classifier.predict(xtest, ytest, 'cpu')

## B. SE

In [None]:
# model = efficientNetC("b0")
classifier = Classification("se")
classifier.fit(xtrain_val, ytrain_val, xval, yval, 'cuda')

In [None]:
classifier.best_model_graph()

In [None]:
classifier.predict(xtest, ytest, 'cpu')

## C. GoogleNet

In [None]:
classifier = Classification("googlenet")
classifier.fit(xtrain_val, ytrain_val, xval, yval, 'cuda')

In [None]:
classifier.best_model_graph()

In [None]:
classifier.predict(xtest, ytest, 'cpu')

## D. ResNet 152 V2


In [None]:
classifier = Classification("resnet152v2")
classifier.fit(xtrain_val, ytrain_val, xval, yval, 'cuda')

In [None]:
classifier.best_model_graph()

In [None]:
classifier.predict(xtest, ytest, 'cpu')

## E. MobileNetV2

In [None]:
classifier = Classification("mobilenetv2")
classifier.fit(xtrain_val, ytrain_val, xval, yval, 'cuda')

In [None]:
classifier.best_model_graph()

In [None]:
classifier.predict(xtest, ytest, 'cpu')