In [None]:
# Import necessary libraries
import pandas as pd
import numpy as np
from pathlib import Path
import re
import os
from PIL import Image
import torch
from torch.utils.data import Dataset
from torchvision import transforms
import torchvision
from torch.utils.data import DataLoader, random_split
import torch.nn as nn
import torchvision.models as models


# Set random seed for reproducibility
np.random.seed(42)

# Set path to directory containing images
path_imgs= Path("../wetransfer-ce6b2b/all_images")

# Extract image names and labels from directory
img_names= ! ls ../wetransfer-ce6b2b/all_images/ 
img_labels= [x[7] for x in img_names]

# Create a pandas dataframe containing filenames and corresponding labels
labels_df= pd.DataFrame(list(zip(img_names, img_labels)), columns=["filenames", "labels"])

# Remove any "c" labels from the dataframe and convert labels to int8 datatype
labels_df = labels_df[labels_df["labels"]!="c"]
labels_df["labels"] = labels_df.labels.astype(np.int8)

# Define a custom image dataset class
class CustomImageDataset(Dataset):
    def __init__(self, dataframe, root_dir, transform=None):
        """
        Args:
            dataframe (pd.DataFrame): DataFrame with filenames in the first column and labels in the second column.
            root_dir (str): Directory containing the images.
            transform (callable, optional): Optional transform to apply to images.
        """
        self.dataframe = dataframe
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        idx = int(idx)  # Convert the index to an integer
        img_name = os.path.join(self.root_dir, self.dataframe.iloc[idx, 0])
        image = Image.open(img_name).convert('RGB')
        label = self.dataframe.iloc[idx, 1]
        label = torch.tensor(label, dtype=torch.long)

        if self.transform:
            image = self.transform(image)

        return image, label

# Set the directory containing the images
root_dir = str(path_imgs)

# Define any image transformations you want to apply
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(128),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Create an instance of the custom dataset
dataset = CustomImageDataset(dataframe=labels_df, root_dir=root_dir, transform=transform)

# Split dataset into training and testing sets
train_ratio = 0.8  # Set the ratio of the training dataset
total_samples = len(dataset)
train_size = int(train_ratio * total_samples)
test_size = total_samples - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Set batch size and number of workers for dataloader
batch_size = 32
shuffle = True
num_workers = 4

# Create data loaders for training and testing sets
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)

class ResNet18WithDropout(nn.Module):
    def __init__(self, resnet18_model, dropout_prob=0.5):
        super(ResNet18WithDropout, self).__init__()
        # Initialize the resnet18 model and dropout layer
        self.resnet18 = resnet18_model
        self.dropout = nn.Dropout(dropout_prob)
        # Replace the last fully connected layer with our own
        self.fc = nn.Linear(resnet18_model.fc.in_features, 2)

    def forward(self, x):
        # Pass the input through the resnet18 model
        x = self.resnet18.conv1(x)
        x = self.resnet18.bn1(x)
        x = self.resnet18.relu(x)
        x = self.resnet18.maxpool(x)
        x = self.resnet18.layer1(x)
        x = self.resnet18.layer2(x)
        x = self.resnet18.layer3(x)
        x = self.resnet18.layer4(x)
        x = self.resnet18.avgpool(x)
        x = torch.flatten(x, 1)
        # Apply dropout and pass through the fully connected layer
        x = self.dropout(x)
        x = self.fc(x)
        return x

# Instantiate the ResNet18 model with dropout and replace the last fully connected layer with our own
resnet18_with_dropout = ResNet18WithDropout(resnet18, dropout_prob=0.5)
resnet18_with_dropout.fc = nn.Linear(num_features, 2)

# Move the model and data to the device (GPU or CPU) for training
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
resnet18_with_dropout = resnet18_with_dropout.to(device)
resnet18 = resnet18.to(device)

# Define the loss function, optimizer, and learning rate scheduler
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(resnet18.parameters(), lr=0.1)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.1)

def train(model, dataloader, criterion, optimizer, device):
    # Set the model to train mode
    model.train()
    running_loss = 0.0
    running_corrects = 0

    for images, labels in dataloader:
        # Move the data to the device
        images, labels = images.to(device), labels.to(device)

        # Zero the gradients and forward pass
        optimizer.zero_grad()
        outputs = model(images)

        # Compute the loss and perform backpropagation
        _, preds = torch.max(outputs, 1)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # Update the running loss and accuracy
        running_loss += loss.item() * images.size(0)
        running_corrects += torch.sum(preds == labels.data)

    # Compute the epoch loss and accuracy
    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = running_corrects.double() / len(dataloader.dataset)

    return epoch_loss, epoch_acc

def validate(model, dataloader, criterion, device):
    # Set model to evaluation mode
    model.eval()
    
    # Initialize loss and correct predictions to 0
    running_loss = 0.0
    running_corrects = 0

    # Disable gradient calculation during validation
    with torch.no_grad():
        # Loop through batches in dataloader
        for images, labels in dataloader:
            # Send images and labels to device
            images, labels = images.to(device), labels.to(device)

            # Compute model outputs
            outputs = model(images)
            _, preds = torch.max(outputs, 1)

            # Compute loss
            loss = criterion(outputs, labels)

            # Update loss and correct predictions
            running_loss += loss.item() * images.size(0)
            running_corrects += torch.sum(preds == labels.data)

        # Compute average loss and accuracy over the dataset
        epoch_loss = running_loss / len(dataloader.dataset)
        epoch_acc = running_corrects.double() / len(dataloader.dataset)

    # Return average loss and accuracy
    return epoch_loss, epoch_acc


def plot_learning_curves(train_losses, test_losses):
    # Plot train and test loss curves
    plt.plot(train_losses, label='Train Loss')
    plt.plot(test_losses, label='Test Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

# Set the number of epochs
num_epochs = 10

# Initialize lists to store train and validation losses
train_losses = []
val_losses = []

# Loop through epochs
for epoch in range(0,num_epochs):
    # Train the model and compute train and validation losses
    train_loss, train_acc = train(resnet18_with_dropout, train_loader, criterion, optimizer, device)
    val_loss, val_acc = validate(resnet18_with_dropout, test_loader, criterion, device)

    # Update the learning rate scheduler based on validation loss
    scheduler.step(val_loss)

    # Append train and validation losses to respective lists
    train_losses.append(train_loss)
    val_losses.append(val_loss)

    # Print epoch results
    print(f'Epoch {epoch}/{num_epochs - 1}, Train Loss: {train_loss:.4f},\
          Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')