# RESNET-50
### Experiment of implementation

In [1]:
# Mount the Google Drive
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


In [9]:
#%% LIBRARIES #################################################################
import os
import random
import time
import numpy as np
from osgeo import gdal
import pandas as pd
from google.colab import drive
from PIL import Image
import tifffile as tiff
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms
from torch.nn.functional import relu
from torchvision.models import resnet50
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
from datetime import datetime

#%% SEED ######################################################################
seed = 1
torch.manual_seed(seed)
random.seed(seed)
np.random.seed(seed)

#%% RESNET50 MODEL CLASS #################################################################
class ResNet50(nn.Module):
    def __init__(self, num_classes, band, pt_value):
        super(ResNet50, self).__init__()

        resnet = resnet50(pretrained = pt_value)  # Use pre-trained ResNet50

        # Modify the first convolutional layer to accept variable input channels
        if band != 3:

            # Adjust the first convolutional layer to accept the new number of input channels
            resnet.conv1 = nn.Conv2d(band, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)

        # Remove the fully connected layers (classification head) of ResNet50
        self.features = nn.Sequential(*list(resnet.children())[:-2])

        # Additional layers for adapting to your specific task
        self.pool = nn.AdaptiveAvgPool2d(1)
        self.fc1 = nn.Linear(2048, 128)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.features(x)
        x = self.pool(x)
        x = x.view(x.size(0), -1)
        x = relu(self.fc1(x))
        x = self.fc2(x)
        print("Forward done")
        return x

#%% DATASET CLASS ##############################################################
class CustomDataset(Dataset):
  def __init__(self, csv_file, Data_Folder, transform = None, datasplit = None, bands = None):
    self.data = pd.read_csv(csv_file, sep = ";", skiprows=None)
    if datasplit:
      self.data = self.data[self.data["datasplit"] == datasplit]
    self.data = self.data.reset_index(drop=True, inplace=False)
    self.image_paths = self.data['file']
    self.image_folder = Data_Folder
    self.target_values = self.data['label']
    self.transform = transform
    self.bands = bands
  def __len__(self):
    return len(self.image_paths)
  def __getitem__(self, idx):
    #print("Accessing item at index:", idx)
    #construct image path by joining folder path and image name
    image_name = self.image_paths[idx]
    image_path = os.path.join(self.image_folder, image_name)
    #open image using PIL
    #image = tiff.imread(image_path)
    image = tiff.imread(image_path)[:, :, self.bands]
    #print("Original image shape:", image.shape)
    image = np.squeeze(image, axis=2)
    image = Image.fromarray((image * 255).astype('uint8'))
    #print("Converted image shape:", image.size)
    if self.transform:
      image = self.transform(image)
    #Mohamed's normalization method
    image= image[:,:,:]/10000

    #image = image.astype('float32')
    #image = torch.from_numpy(image)

    #get the labels
    label = int(self.target_values[idx])
    #print('Labels 1:', label)
    label = np.array(label).astype('float32')
    #print('Labels 2:', label)
    label = torch.tensor(label, dtype=torch.long)  # Convert label to a tensor
    #print('Labels 3:', label)
    return image, label

# Set file directory of the location where the data is stored
Data_Folder = "/content/drive/MyDrive/Master's Thesis/DATA/dataset_10bands_inc"
csv_file = "/content/drive/MyDrive/Master's Thesis/DATA/new_dataset_s2.csv"
log_file_path = "/content/drive/MyDrive/Master's Thesis/DATA/results/experiment_log.txt"

# Set transformation
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Set bands for experiments
bands = [[0]]

for band in bands:
    print(band)

    #%% INITIATE DATASETS ###################################################################
    train_dataset = CustomDataset(csv_file, Data_Folder, transform=transform, datasplit="train", bands=band)
    test_dataset = CustomDataset(csv_file, Data_Folder, transform=transform, datasplit="test", bands=band)
    validation_dataset = CustomDataset(csv_file, Data_Folder, transform=transform, datasplit="val", bands=band)

    #%% CREATE MODEL & LOSS & OPTIMIZER ###################################################################
    # Set seed for Model Initialization
    torch.manual_seed(seed)
    # Create the instance of our Network
    model = ResNet50(4, len(band), True) # True = pre-trained; False = from scratch
    # Define loss function (e.g., Cross-Entropy Loss for segmentation)
    loss_fn = nn.CrossEntropyLoss()
    # Define optimizer (e.g., Adam optimizer)
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    #%% SETTING SEED, BATCH_SIZE, DATALOADER ###################################################################
    # Set seed for DataLoader (if using shuffle)
    torch.manual_seed(seed)
    # Data Loader Creation
    batch_size = 64
    # Current Version: only load "train" in the data loader
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    #%% TRAINING LOOP ###################################################################

    # Empty list to store loss values for visualization
    train_loss_values = []
    val_loss_values = []
    test_loss_values = []

    # Create a text file to save the parameters and loss
    log_file = open(log_file_path, "a")  # "a" mode appends to the file
    log_file.write(f"********************************************************\n")
    current_datetime = datetime.now()
    log_file.write(f"EXPERIMENT OF: {current_datetime}\n")
    log_file.write(f"############\nNOTES:\nBAND(S) CONSIDERED: {band}\nSEED: {seed}\nLEARNINGRATE: lr\n")
    log_file.write(f"TRAINING\n")
    log_file.write(f"\n")


    #%% TRAINING ###################################################################
    torch.manual_seed(seed)
    num_epochs = 1
    for epoch in range(num_epochs):
        print("Epoch:", epoch+1)
        start_time = time.time()
        ############################# TRAINING STEP
        model.train()  # Set the model to training mode
        total_loss = 0
        for batch in train_loader:
            inputs, targets = batch
            #inputs = inputs.permute(0, 3, 1, 2)
            # Forward pass
            outputs = model(inputs)
            # Compute loss
            loss = loss_fn(outputs, targets)
            total_loss += loss.item()
            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            print("current batch completed")
            current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            print("Current Time:", current_time)
        # Calculate the average loss for the epoch
        avg_loss = total_loss / len(train_loader)
        # Append the loss to the list
        train_loss_values.append(avg_loss)
        # Print average loss for the epoch
        print(f"Epoch [{epoch + 1}/{num_epochs}], Training Loss: {avg_loss}\n")
        # Save parameters and loss to the log file
        log_file.write(f"Epoch [{epoch + 1}/{num_epochs}], Training Loss: {avg_loss}\n")
        print("**********************************")
        print("training done")
        ############### VALIDATION STEP
        # Create a text file to save the parameters and loss
        log_file.write(f"-----------\n")
        log_file.write(f"EVALUATION\n")
        # Set seed for validation loop
        torch.manual_seed(seed)
        model.eval()  # Set the model to evaluation mode
        total_val_loss = 0
        with torch.no_grad():  # Disable gradient calculation for validation
            for val_batch in val_loader:
                val_inputs, val_targets = val_batch
                #val_inputs = val_inputs.permute(0, 3, 1, 2)
                # Forward pass for validation
                val_outputs = model(val_inputs)
                # Compute validation loss
                val_loss = loss_fn(val_outputs, val_targets)
                total_val_loss += val_loss.item()
        # Calculate and print average validation loss and accuracy
        avg_val_loss = total_val_loss / len(val_loader)
        val_loss_values.append(avg_val_loss)
        print(f"Epoch [{epoch + 1}/{num_epochs}], Validation Loss: {avg_val_loss}\n")
        # Print value in file
        log_file.write(f"Epoch [{epoch + 1}/{num_epochs}], Validation Loss: {avg_val_loss}\n")
        end_time = time.time()
        elapsed_time = end_time - start_time  # Calculate the elapsed time
        print(f"Time elapsed for epoch {epoch + 1}: {elapsed_time:.2f} seconds, or {elapsed_time/60:.2f} minutes")
    # Close the log file
    log_file.close()



    #%% TESTING ###################################################################

    # Create a text file to save the parameters and loss
    log_file_path = "/content/drive/MyDrive/Master's Thesis/DATA/results/experiment_log.txt"
    log_file = open(log_file_path, "a")  # "a" mode appends to the file
    log_file.write(f"__________________\n")
    log_file.write(f"TESTING\n")
    # Set seed for testing loop
    torch.manual_seed(seed)
    model.eval()  # Set the model to evaluation mode
    loss_values_test = 0.0
    correct = 0
    with torch.no_grad():  # Disable gradient calculation for validation
        for images, labels in test_loader:
            #images = images.permute(0, 3, 1, 2)
            # Forward pass
            outputs = model(images)
            # Calculate validation loss
            loss = loss_fn(outputs, labels)
            loss_values_test += loss.item()
            # Calculate the number of correct predictions
            _, predicted = outputs.max(1)
            correct += (predicted == labels).sum().item()
    # Calculate and print average validation loss and accuracy
    avg_test_loss = loss_values_test / len(test_loader.dataset)
    accuracy = correct / len(test_loader.dataset) * 100
    print(f'Testing Loss: {avg_test_loss:.4f}, Testing Accuracy: {accuracy:.2f}%')
    # Print value in the file
    log_file.write(f"Loss: {avg_test_loss}, Accuracy: {accuracy:.2f}\n")
    # Close the log file
    log_file.close()

    # PLOT #####################################################
    plt.figure()
    plt.plot(range(1, num_epochs + 1), train_loss_values, marker='o', linestyle='-', color='blue', label='Training Loss')
    plt.plot(range(1, num_epochs + 1), val_loss_values, marker='o', linestyle='-', color='red', label='Validation Loss')
    #plt.scatter(num_epochs + 1, avg_test_loss, color='green', label='Testing Loss')
    plt.title('Training Loss vs Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)
    plt.show()

Forward done
current batch completed
Current Time: 2023-12-07 16:23:47
Epoch [1/1], Training Loss: 0.5108140048881372

**********************************
training done


RuntimeError: ignored