In [4]:
import torch
import torchvision.models as models
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm
from torchvision.models.resnet import ResNet50_Weights
from torchvision.models.densenet import DenseNet161_Weights
from torchvision.models.resnet import ResNeXt50_32X4D_Weights
from torchvision.models.resnet import ResNeXt101_64X4D_Weights
from torchvision.models.resnet import ResNet101_Weights
from torchvision.models.inception import Inception_V3_Weights
import os
import cv2
import pandas as pd

In [8]:
MODEL="resnet50"

In [2]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(DEVICE)

# Hyperparameters
LEARNING_RATE = 0.0005
TRAIN_BATCH_SIZE = 8
VALID_BATCH_SIZE = 8
TEST_BATCH_SIZE = 8

EPOCHS = 20
NUM_WORKERS = 0
PIN_MEMORY = False
TRAIN_IMG_DIR = "C:/Users/sanja/OneDrive/WiSe_23/ISM/ism_project_2023/phase2/ism_project_2023/train"
TRAIN_CSV = "C:/Users/sanja/OneDrive/WiSe_23/ISM/ism_project_2023/phase2/ism_project_2023/train.csv"
NUM_CLASSES = 4

# Augmentation parameters
IMAGE_HEIGHT = 224
IMAGE_WIDTH = 224

cpu


In [5]:
class ColonCancerDataset(Dataset):
    def __init__(self, image_dir, annotation_file, transform):
        self.image_dir = image_dir
        self.transform = transform
        self.images = os.listdir(image_dir)
        self.csv = pd.read_csv(annotation_file)

    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, index):  #iterator over the dataset
        image_path = os.path.join(self.image_dir, self.images[index])
        #read image and labels
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image_basename = os.path.splitext(os.path.basename(image_path))[0] #without '.jpg' 
        #print("image_basename: ", image_basename)
        label = self.csv[self.csv['name'] == image_basename]['label'].values[0]
        #print("label: ", label)

        #applying augmentations
        if self.transform:  
            augmentations = self.transform(image=image)
            image = augmentations['image']

        return image, label

In [7]:
class Train():
  def __init__(self) -> None:
      print("Inside Training Loop")

  def train(self,trainloader, model, optimizer, loss_fn, DEVICE='cuda'):
    loop = tqdm(trainloader, leave=True)
    running_loss = 0.0
    model.train()
    
    for batch_idx, (image, targets) in enumerate(loop):
        image = image.to(DEVICE)
        targets = targets.to(DEVICE)
        optimizer.zero_grad()
        predictions = model(image)
        loss = loss_fn(predictions, targets)
        loss.backward()
        optimizer.step()
        loop.set_postfix(loss=loss.item())
        running_loss += loss.item()
    train_loss = running_loss / len(trainloader)
    return train_loss

  def validation(self,validloader, model, loss_fn, DEVICE='cuda'):
      loop = tqdm(validloader, leave=True)
      model.eval()
      valid_loss = 0.0

      with torch.no_grad():
          for batch_idx, (image, targets) in enumerate(loop):
              image = image.to(DEVICE)
              targets = targets.to(DEVICE)

              preds = model(image)
              loss = loss_fn(preds, targets)
              loop.set_postfix(loss=loss.item())
              valid_loss += loss.item()

      valid_loss = valid_loss / len(validloader)
      return valid_loss

In [None]:

#augmentations
transform = A.Compose(
    [
        A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH), #always use resizing to reduce computation
        A.HorizontalFlip(p=0.5),
        A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), #using imagenet mean and std
        ToTensorV2(), #converting to the pytorch required format: [batch, channels, height, width]
    ]
)

if transform is not None:
    print("INFO: Augmentations applied")
#load the images and labels 
dataset = ColonCancerDataset(
    image_dir=TRAIN_IMG_DIR,
    annotation_file=TRAIN_CSV,
    transform=transform,
)

print("Dataset loaded")

#split: 80% train, 20% validation
train_size = int(0.9 * len(dataset))
valid_size = len(dataset) - train_size
trainset, validset = torch.utils.data.random_split(dataset, [train_size, valid_size])
print(f"INFO: Training data split, TRAIN: {train_size}, VALID: {valid_size}")

# Further split validation set into validation and test sets
valid_size = int(0.5 * len(validset))
test_size = len(validset) - valid_size
valid_dataset, test_dataset = torch.utils.data.random_split(validset, [valid_size, test_size])
print(f"INFO: Training data split, TRAIN: {train_size}, VALID: {valid_size},TEST: {test_size}")

#dataloaders
train_loader = DataLoader(
    dataset=trainset,
    batch_size=TRAIN_BATCH_SIZE,
    num_workers=NUM_WORKERS,
    pin_memory=PIN_MEMORY,
    shuffle=True,
)

valid_loader = DataLoader(
    dataset=validset,
    batch_size=VALID_BATCH_SIZE,
    num_workers=NUM_WORKERS,
    pin_memory=PIN_MEMORY,
    shuffle=False,
)

test_loader = DataLoader(
    dataset=test_dataset,
    batch_size=TEST_BATCH_SIZE,
    num_workers=NUM_WORKERS,
    pin_memory=PIN_MEMORY,
    shuffle=False,
)

print("Train, Val, Test Loaded")

#loading model and setting hyperparameters

if MODEL == "resnet50":
    model = models.resnext50_32x4d(weights=ResNeXt50_32X4D_Weights.IMAGENET1K_V1) 
elif MODEL == "resnet101":
    model = models.resnet101(weights=ResNet101_Weights.IMAGENET1K_V1)
elif MODEL == "densenet161":
    model = models.densenet161(weights=DenseNet161_Weights.IMAGENET1K_V1)
    
#TODO: implement Inceptionv3    
model.dropout=torch.nn.Dropout(p=0.3) #TODO: check dropout
model.fc = torch.nn.Linear(in_features=2048, out_features=NUM_CLASSES) #changing last layer 
model.to(DEVICE)

optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
loss_fn = torch.nn.CrossEntropyLoss() #TODO: focal loss implementation

best_loss = 0.90
best_epoch = 0

print(model)
utils=Train()
print("INFO: Training started")
for epoch in range(1, EPOCHS+1):
    print(f"Epoch: {epoch}/{EPOCHS}")   #TODO: modifyl learning rate after 10 epochs
    train_loss = utils.train(train_loader, model, optimizer, loss_fn)
    valid_loss = utils.validation(valid_loader, model, loss_fn)

    print(f"INFO: Training loss: {train_loss:.3f}")
    print(f"INFO: Validation loss {valid_loss:.3f}")

    #saving best model based on losses
    if valid_loss < best_loss:
        best_loss = valid_loss
        torch.save(model.state_dict(), f"best_{MODEL}_model.pth")
        best_epoch = epoch
        print(f"Best model saved at epoch {best_epoch} with loss: {best_loss:.3f}") 

print("INFO: Training completed") 
    
#test_model

print("Testing Started!")
model.load_state_dict(torch.load("best_{MODEL}_model.pth"))
model.to(DEVICE)
model.eval()
from sklearn.metrics import accuracy_score, classification_report

test_predictions = []
test_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)

        test_predictions.extend(predicted.cpu().numpy())
        test_labels.extend(labels.cpu().numpy())

accuracy = accuracy_score(test_labels, test_predictions)
classification_report_result = classification_report(test_labels, test_predictions)

print(f"Accuracy: {accuracy:.4f}") #TODO:write output to text file
print("Classification Report:")
print(classification_report_result)
