In [1]:
# Imports
import os
import sys
import glob
import torch
import torchvision

import numpy as np
import datetime as dt
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt

from PIL import Image
from torch.utils.data import Dataset
from torch.autograd import Variable
from torch.optim import lr_scheduler

from torch.utils.data import Dataset, DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
from torchvision import transforms, datasets, models
from os import listdir, makedirs, getcwd, remove
from os.path import isfile, join, abspath, exists, isdir, expanduser

%matplotlib inline


In [2]:
data_path = "../input/ammi-2021-convnets/"
train_path = join(data_path, "train/train")
test_path = join(data_path,"test/test")
extraimage_path = join(data_path, "extraimages/extraimages")

In [3]:
import os
os.environ['KAGGLE_USERNAME'] = 'muhirwasalomon'
os.environ['KAGGLE_KEY'] = 'f2e155be15e51da2a38317f327c0669b'
!kaggle competitions download -c ammi-2023-convnets --force
!unzip -q ammi-2023-convnets.zip -d ammi-2023-convnets


Downloading ammi-2023-convnets.zip to /kaggle/working
100%|█████████████████████████████████████▉| 2.30G/2.30G [00:58<00:00, 43.9MB/s]
100%|██████████████████████████████████████| 2.30G/2.30G [00:58<00:00, 42.4MB/s]


In [4]:
train_path = 'ammi-2023-convnets/train/train'
test_path = 'ammi-2023-convnets/test/test'


In [6]:
import torch
import torchvision.transforms as transforms
from sklearn.utils import class_weight
from collections import Counter
torch.random.seed()
# Define the transformations
train_transform = transforms.Compose([
    transforms.RandomResizedCrop(size=224),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(degrees=30),
    transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
    transforms.Resize(255),
    transforms.RandomResizedCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Compute class weights
def compute_class_weights(labels):
    label_counts = Counter(labels)
    class_weights = [1.0 / label_counts[label] for label in labels]
    return torch.tensor(class_weights, dtype=torch.float)

class CassavaDataset(Dataset):
    def __init__(self, path, transform=None):
        self.classes = os.listdir(path)
        self.path = [f"{path}/{className}" for className in self.classes]
        self.file_list = [glob.glob(f"{x}/*") for x in self.path]
        self.transform = transform

        files = []
        for i, className in enumerate(self.classes):
            for fileName in self.file_list[i]:
                files.append([i, fileName])
        self.file_list = files

        # Compute class weights
        labels = [item[0] for item in self.file_list]
        self.class_weights = compute_class_weights(labels)

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        fileName = self.file_list[idx][1]
        classCategory = self.file_list[idx][0]
        im = Image.open(fileName)

        if self.transform:
            im = self.transform(im)

        return im, classCategory


In [7]:
train_data = CassavaDataset(train_path, transform=train_transform)
test_data = CassavaDataset(test_path, transform=test_transform)

In [8]:
import numpy as np
from sklearn.model_selection import train_test_split

validation_split = 0.2
random_seed = 42
shuffle_dataset = True  # Define shuffle_dataset variable here

# Creating data indices for training and validation splits
dataset_size = len(train_data)
indices = list(range(dataset_size))
split = int(np.floor(validation_split * dataset_size))

if shuffle_dataset:
    np.random.seed(random_seed)
    np.random.shuffle(indices)

train_indices, val_indices = indices[split:], indices[:split]


In [9]:
batch_size=8

In [10]:
from torch.utils.data import SubsetRandomSampler, DataLoader

train_sampler = SubsetRandomSampler(train_indices)
val_sampler = SubsetRandomSampler(val_indices)

train_loader = DataLoader(train_data, batch_size=batch_size, sampler=train_sampler)
val_loader = DataLoader(train_data, batch_size=batch_size, sampler=val_sampler)
test_loader = DataLoader(test_data, batch_size=batch_size)


In [11]:

# Device configuration
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [12]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.models import resnet18

# Define the ResNet-based model
class ResNetClassifier(nn.Module):
    def __init__(self, num_classes, weight_decay=0.0001):
        super(ResNetClassifier, self).__init__()
        # Load the pre-trained ResNet50 model
        self.resnet = resnet18(pretrained=True)
        
        # Modify the last fully-connected layer for the desired number of classes
        num_features = self.resnet.fc.in_features
        self.resnet.fc = nn.Linear(num_features, num_classes)
        
        # Define the weight decay (L2 regularization) for the parameters
        self.weight_decay = weight_decay

    def forward(self, input):
        x = self.resnet(input)
        return x

model = ResNetClassifier(num_classes=5)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.RMSprop(model.parameters(), lr=0.0002, weight_decay=model.weight_decay)


Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:00<00:00, 228MB/s]


In [13]:
# #weights initialization
# def initalize_weights(m):
#     classname = m.__class__.__name__
#     if classname.find('Conv') != -1:
#         nn.init.normal_(m.weight.data, 0.0, 0.02) #Initialize the weights to mean=0, sd=0.02
#     elif classname.find('BatchNorm') != -1:
#         nn.init.normal_(m.weight.data, 1.0, 0.02)
#         nn.init.constant_(m.bias.data, 0)

In [14]:
def reset_model_parameters(model):
    """Reset the parameters of a PyTorch model."""
    for param in model.parameters():
        if param.requires_grad:
            param.data.uniform_(-0.1, 0.1)


In [15]:
# from sklearn.model_selection import KFold
# def train(model, criterion, train_data, optimizer, num_epochs, num_folds):
#     """Train a model using cross-validation."""
#     kf = KFold(n_splits=num_folds, shuffle=True)

#     print('----- Cross-validation Loop -----')
#     # Loop over folds.
#     for fold, (train_indices, val_indices) in enumerate(kf.split(train_data)):
#         print(f'Fold: {fold + 1}')

# #         # Reset model parameters for each fold.
# #         model = ResNetClassifier(5) 
# #         model.reset_model_parameters()

#         # Exponential moving average of the loss.
#         ema_loss = None

#         print('----- Training Loop -----')

#         # Get the training data and targets for the current fold.
#         fold_train_data = [train_data[idx] for idx in train_indices]

#         # Create a dataloader for the training data.
#         fold_train_loader = DataLoader(fold_train_data, batch_size=batch_size, shuffle=True)

#         # Loop over epochs.
#         for epoch in range(num_epochs):
#             # Training phase
#             model.train()

#             # Loop over training data.
#             for batch_idx, (features, target) in enumerate(fold_train_loader):
#                 # Move data to the device.
#                 features = features.to(device)
#                 target = target.to(device)

#                 # Forward pass.
#                 output = model(features)
#                 loss = criterion(output, target)

#                 # Backward pass.
#                 optimizer.zero_grad()
#                 loss.backward()
#                 optimizer.step()

#                 # Update exponential moving average of the loss.
#                 if ema_loss is None:
#                     ema_loss = loss.item()
#                 else:
#                     ema_loss += (loss.item() - ema_loss) * 0.01

#             # Print training progress at the end of the epoch.
#             print('Fold: {} \tEpoch: {} \tTraining Loss: {:.3f}'.format(fold + 1, epoch, ema_loss))


In [16]:
from sklearn.model_selection import KFold
from copy import deepcopy

def train(model, criterion, train_data, optimizer, num_epochs, num_folds, patience):
    """Train a model using cross-validation."""
    kf = KFold(n_splits=num_folds, shuffle=True)
    
    print('----- Cross-validation Loop -----')
    # Loop over folds.
    for fold, (train_indices, val_indices) in enumerate(kf.split(train_data)):
        print(f'Fold: {fold + 1}')

        # Reset model parameters for each fold.
        #model.apply(reset_parameters)
        
        # Exponential moving average of the loss.
        ema_loss = None
        
        best_loss = float('inf')
        counter = 0

        print('----- Training Loop -----')

        # Get the training and validation data for the current fold.
        fold_train_data = [train_data[idx] for idx in train_indices]
        fold_val_data = [train_data[idx] for idx in val_indices]

        # Create data loaders for training and validation data.
        fold_train_loader = DataLoader(fold_train_data, batch_size=batch_size, shuffle=True)
        fold_val_loader = DataLoader(fold_val_data, batch_size=batch_size, shuffle=False)

        # Loop over epochs.
        for epoch in range(num_epochs):
            # Training phase
            model.train()

            # Loop over training data.
            for batch_idx, (features, target) in enumerate(fold_train_loader):
                # Move data to the device.
                features = features.to(device)
                target = target.to(device)

                # Forward pass.
                output = model(features)
                loss = criterion(output, target)

                # Backward pass.
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                # Update exponential moving average of the loss.
                if ema_loss is None:
                    ema_loss = loss.item()
                else:
                    ema_loss += (loss.item() - ema_loss) * 0.01

            # Print training progress at the end of the epoch.
            print('Fold: {} \tEpoch: {} \tTraining Loss: {:.3f}'.format(fold + 1, epoch, ema_loss))
            
            # Validation phase
            model.eval()
            val_loss = 0.0
            
            with torch.no_grad():
                for val_features, val_target in fold_val_loader:
                    val_features = val_features.to(device)
                    val_target = val_target.to(device)
                    
                    val_output = model(val_features)
                    val_loss += criterion(val_output, val_target).item()
            
            val_loss /= len(fold_val_loader)
            
            # Check if the current validation loss is the best so far
            if val_loss < best_loss:
                best_loss = val_loss
                best_model = deepcopy(model)
                counter = 0
            else:
                counter += 1
            
            # Check if the training should be stopped early
#             if counter >= patience:
#                 print('Early stopping! No improvement in validation loss for {} epochs.'.format(patience))
#                 break
        
        # Set the best model as the final model for this fold
        model = best_model

def reset_parameters(module):
    """Reset the parameters of a module."""
    if hasattr(module, 'reset_parameters'):
        module.reset_parameters()



In [17]:
def test(model, data_loader):
    """Measures the accuracy of a model on a data set."""
    # Make sure the model is in evaluation mode.
    model.eval()
    correct = 0
    print('----- Model Evaluation -----')
    # We do not need to maintain intermediate activations while testing.
    with torch.no_grad():

        # Loop over test data.
        for features, target in data_loader:

            # Move data to the device.
            features = features.to(device)
            target = target.to(device)

            # Forward pass.
            output = model(features)

            # Get the label corresponding to the highest predicted probability.
            pred = output.argmax(dim=1, keepdim=True)

            # Move tensors to the same device as the model.
            pred = pred.to(device)
            target = target.to(device)

            # Count number of correct predictions.
            correct += pred.eq(target.view_as(pred)).sum().item()

    # Print test accuracy.
    percent = 100. * correct / len(val_sampler)
    print(f'Test accuracy: {correct} / {len(data_loader.dataset)} ({percent:.0f}%)')
    torch.save(model.state_dict(), 'model.ckpt')
    return percent


In [18]:
# def test(model, data_loader):
#     """Measures the accuracy of a model on a data set."""
#     # Make sure the model is in evaluation mode.
#     model.eval()
#     correct = 0
#     total = 0

#     print('----- Model Evaluation -----')
#     # We do not need to maintain intermediate activations while testing.
#     with torch.no_grad():
#         # Loop over test data.
#         for features, target in data_loader:
#             # Move data to the device.
#             features = features.to(device)
#             target = target.to(device)

#             # Forward pass.
#             output = model(features)

#             # Get the label corresponding to the highest predicted probability.
#             _, predicted = torch.max(output.data, 1)

#             # Count number of correct predictions.
#             total += target.size(0)
#             correct += (predicted == target).sum().item()

#     # Calculate test accuracy.
#     accuracy = 100 * correct / total
#     print(f'Test accuracy: {correct} / {total} ({accuracy:.2f}%)')

#     return accuracy


In [None]:
# Define the number of epochs and patience
num_epochs = 15
patience = 3

# Train the model using cross-validation
train(model, criterion, train_data, optimizer, num_epochs, num_folds=5, patience=patience)

# Evaluate the model on the validation set
val_accuracy = test(model, val_loader)


----- Cross-validation Loop -----
Fold: 1
----- Training Loop -----
Fold: 1 	Epoch: 0 	Training Loss: 1.060
Fold: 1 	Epoch: 1 	Training Loss: 0.862
Fold: 1 	Epoch: 2 	Training Loss: 0.719
Fold: 1 	Epoch: 3 	Training Loss: 0.589
Fold: 1 	Epoch: 4 	Training Loss: 0.440
Fold: 1 	Epoch: 5 	Training Loss: 0.358
Fold: 1 	Epoch: 6 	Training Loss: 0.259
Fold: 1 	Epoch: 7 	Training Loss: 0.224
Fold: 1 	Epoch: 8 	Training Loss: 0.185
Fold: 1 	Epoch: 9 	Training Loss: 0.173
Fold: 1 	Epoch: 10 	Training Loss: 0.159
Fold: 1 	Epoch: 11 	Training Loss: 0.133
Fold: 1 	Epoch: 12 	Training Loss: 0.105
Fold: 1 	Epoch: 13 	Training Loss: 0.110
Fold: 1 	Epoch: 14 	Training Loss: 0.096
Fold: 2
----- Training Loop -----
Fold: 2 	Epoch: 0 	Training Loss: 0.837
Fold: 2 	Epoch: 1 	Training Loss: 0.816
Fold: 2 	Epoch: 2 	Training Loss: 0.838
Fold: 2 	Epoch: 3 	Training Loss: 0.847
Fold: 2 	Epoch: 4 	Training Loss: 0.861
Fold: 2 	Epoch: 5 	Training Loss: 0.824
Fold: 2 	Epoch: 6 	Training Loss: 0.795
Fold: 2 	Epoc

In [None]:

# test(model, test_loader)

In [None]:
import os
from os import listdir
from PIL import Image
import torch
import torch.nn.functional as F
from torchvision import transforms

# Step 1
test_directory = "ammi-2023-convnets/test/test/0"
predictions, test_image_fileName = [], []

# Step 2
# Process image
def preprocess_image(image):
    image = image.resize((224, 224))
    image = transforms.ToTensor()(image)
    image = transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])(image)
    return image

# Step 3
def predict(image, model):
    # Pass the image through the model
    output = model(image.to(device))
    # Reverse the log function of the output
    output = torch.exp(output)
    # Get the predicted class
    _, predicted_class = output.max(1)
    return predicted_class.item()

# Step 4
# Get the list of class names
class_names = train_loader.dataset.classes

# Step 5
try:
    # Iterate over the test images
    test_images = listdir(test_directory)
    for image_name in test_images:
        # Check if the item is not a directory
        if not os.path.isdir(os.path.join(test_directory, image_name)):
            # Get the image path
            image_path = os.path.join(test_directory, image_name)
            print(f"Preprocess image: {image_path}")
            
            # Open and preprocess the image
            image = Image.open(image_path)
            image = preprocess_image(image)
            image = image.unsqueeze(0).to(device)
            
            # Perform the prediction on the image
            with torch.no_grad():
                top_class = predict(image, model)
                # Add the predicted class and image name to the list
                predictions.append(class_names[top_class])
                test_image_fileName.append(image_name)
                # Print the prediction result for the image
                print(f"Prediction for image {image_path}: {class_names[top_class]}")

    # Sort predictions and image filenames based on predictions
    sorted_predictions, sorted_filenames = zip(*sorted(zip(predictions, test_image_fileName)))
    predictions = list(sorted_predictions)
    test_image_fileName = list(sorted_filenames)

except Exception as e:
    # Handle any exceptions that occur during the process
    print(e)


In [None]:
# Make submission here

In [None]:
import pandas as pd

submission_data = {"Category": predictions, "Id": test_image_fileName}
submission_data_frame = pd.DataFrame(submission_data)

# Save the DataFrame to CSV
submission_data_frame.to_csv('/kaggle/working/submission_file5.csv', index=False)


In [None]:
import pandas as pd

submission_data = {"Category": predictions, "Id": test_image_fileName}
submission_data_frame = pd.DataFrame(submission_data)

# Save the DataFrame to CSV
submission_data_frame.to_csv('/kaggle/working/submission_file4.csv', index=False)
