In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# You can write up to 5GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import time
import copy
import glob

import torch
import torchvision
import torchvision.transforms as transforms
import torchvision.models as models
import torch.nn as nn
import torch.optim as optim

import matplotlib.pyplot as plt

In [None]:
data_path="/kaggle/input/cropped/cropped"

train_transform = transforms.Compose(
    [
        transforms.Resize([265,256]),
        #transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),  # Safer scale for cropping
        transforms.RandomHorizontalFlip(),  # Safe and common for pre-trained models
        transforms.RandomRotation(degrees=10),  # Small rotation for robustness
        transforms.RandomAffine(degrees=0, shear=10),
        transforms.ToTensor(),
    ]
)

# Transformations for the validation dataset
val_transform = transforms.Compose(
    [
        transforms.Resize([256, 256]),
        #transforms.CenterCrop(224),
        transforms.ToTensor(),
        
    ]
)

# Load the entire dataset
full_dataset = torchvision.datasets.ImageFolder(
    root=data_path,
    transform=None  # No transform applied initially
)
train_size = int(0.80 * len(full_dataset))  # 80% for training
val_size = len(full_dataset) - train_size  # 20% for validation
torch.manual_seed(42)
train_dataset, val_dataset = torch.utils.data.random_split(full_dataset, [train_size, val_size])
train_dataset.dataset.transform = train_transform
val_dataset.dataset.transform = val_transform
import json

# Save the indices
train_indices = train_dataset.indices
val_indices = val_dataset.indices

with open('dataset_split.json', 'w') as f:
    json.dump({'train': train_indices, 'val': val_indices}, f)
train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=32,
    num_workers=4,
    shuffle=True
)



In [None]:
batch = next(iter(train_loader))
print(batch[0].shape)
plt.imshow(batch[0][0].permute(1, 2, 0))
print(batch[1][0])


In [None]:
resnet50 = models.resnet50(weights="DEFAULT")

In [None]:
print(resnet50)

# # Feature Extracting a Pretrained Model

Since this pretrained model is trained on ImageNet dataset, the output layers has 1000 nodes. We want to reshape this last classifier layer to fit this dataset which has 2 classes. Furthermore, in feature extracting, we don't need to calculate gradient for any layers except the last layer that we initialize. For this we need to set `.requires_grad` to `False`

In [None]:
def set_parameter_requires_grad(model, feature_extracting=True):
    if feature_extracting:
        for param in model.parameters():
            param.requires_grad = False
            
set_parameter_requires_grad(resnet50)

In [None]:
# Initialize new output layer
resnet50.fc = nn.Linear(2048, 5)

In [None]:
# Check which layer in the model that will compute the gradient
for name, param in resnet50.named_parameters():
    if param.requires_grad:
        print(name, param.data)

In [None]:
def train_model(model, dataloaders, criterion, optimizer, device, num_epochs=50, is_train=True):
    since = time.time()
    
    acc_history = []
    loss_history = []

    best_acc = 0.0
    
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        running_loss = 0.0
        running_corrects = 0

        # Iterate over data.
        for inputs, labels in dataloaders:
            inputs = inputs.to(device)
            labels = labels.to(device)
            model.to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            _, preds = torch.max(outputs, 1)

            # backward
            loss.backward()
            optimizer.step()

            # statistics
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        epoch_loss = running_loss / len(dataloaders.dataset)
        epoch_acc = running_corrects.double() / len(dataloaders.dataset)

        print('Loss: {:.4f} Acc: {:.4f}'.format(epoch_loss, epoch_acc))

        if epoch_acc > best_acc:
            best_acc = epoch_acc

        acc_history.append(epoch_acc.item())
        loss_history.append(epoch_loss)
        
        torch.save(model.state_dict(), os.path.join('/kaggle/working/', '{0:0=2d}.pth'.format(epoch)))

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best Acc: {:4f}'.format(best_acc))
    
    return acc_history, loss_history

In [None]:
# Here we only want to update the gradient for the classifier layer that we initialized.
params_to_update = []
for name,param in resnet50.named_parameters():
    if param.requires_grad == True:
        params_to_update.append(param)
        print("\t",name)
            
optimizer = optim.Adam(params_to_update)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

import torch
import torch.nn as nn

import torch
import torch.nn as nn

class FocalLoss(nn.Module):
    def __init__(self, alpha=None, gamma=2.0, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        # Compute softmax probabilities (BCE Loss)
        BCE_loss = nn.CrossEntropyLoss(reduction='none')(inputs, targets)
        
        # Calculate probabilities
        pt = torch.exp(-BCE_loss)  # Probabilities for each class

        # Make sure alpha is applied correctly
        if self.alpha is not None:
            # Ensure alpha is on the same device and of the correct shape
            alpha_t = self.alpha[targets]  # Select alpha for the correct class
        else:
            alpha_t = torch.ones_like(pt)  # Default to ones if alpha is not used

        # Compute Focal Loss
        F_loss = alpha_t * (1 - pt) ** self.gamma * BCE_loss
        
        # Apply reduction method
        if self.reduction == 'mean':
            return F_loss.mean()
        elif self.reduction == 'sum':
            return F_loss.sum()
        else:
            return F_loss
import torch
import torch.nn as nn

class FocalLoss(nn.Module):
    def __init__(self, alpha=None, gamma=2.0, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        # Compute softmax probabilities (BCE Loss)
        BCE_loss = nn.CrossEntropyLoss(reduction='none')(inputs, targets)
        
        # Calculate probabilities
        pt = torch.exp(-BCE_loss)  # Probabilities for each class

        # Make sure alpha is applied correctly
        if self.alpha is not None:
            # Ensure alpha is on the same device and of the correct shape
            alpha_t = self.alpha[targets]  # Select alpha for the correct class
        else:
            alpha_t = torch.ones_like(pt)  # Default to ones if alpha is not used

        # Compute Focal Loss
        F_loss = alpha_t * (1 - pt) ** self.gamma * BCE_loss
        
        # Apply reduction method
        if self.reduction == 'mean':
            return F_loss.mean()
        elif self.reduction == 'sum':
            return F_loss.sum()
        else:
            return F_loss


weights=[8.4147,12,5,2.7538,15]
class_weights = torch.tensor(weights).to(device)
criterion = FocalLoss(alpha=class_weights).to(device)

# Train model
train_acc_hist, train_loss_hist = train_model(resnet50, train_loader, criterion, optimizer, device)

In [None]:
#test_path = r"C:\Users\HP\Desktop\IISC\Sem 1\Machine Learning for Cyber Physical Systems\Project 1\ML4CPS-Project1-\Test_Data"

test_loader = torch.utils.data.DataLoader(
    val_dataset,
    batch_size=32,
    num_workers=1,
    shuffle=False
)

print(len(test_loader))

In [None]:
def eval_model(model, dataloaders, device):
    since = time.time()
    
    acc_history = []
    best_acc = 0.0

    saved_models = glob.glob('/kaggle/working/' + '*.pth')
    saved_models.sort()
    print('saved_model', saved_models)

    for model_path in saved_models:
        print('Loading model', model_path)

        model.load_state_dict(torch.load(model_path))
        model.eval()
        model.to(device)

        running_corrects = 0
        # Iterate over data.
        for inputs, labels in dataloaders:
            inputs = inputs.to(device)
            labels = labels.to(device)

            with torch.no_grad():
                outputs = model(inputs)

            _, preds = torch.max(outputs, 1)
            running_corrects += torch.sum(preds == labels.data)

        epoch_acc = running_corrects.double() / len(dataloaders.dataset)

        print('Acc: {:.4f}'.format(epoch_acc))
        
        if epoch_acc > best_acc:
            best_acc = epoch_acc

        acc_history.append(epoch_acc.item())

        print()

    time_elapsed = time.time() - since
    print('Validation complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best Acc: {:4f}'.format(best_acc))
    
    return acc_history

In [None]:
val_acc_hist = eval_model(resnet50, test_loader, device)

In [None]:
plt.plot(train_acc_hist)
plt.plot(val_acc_hist)
plt.show()

In [None]:
print("Hi")

In [None]:
plt.plot(val_loss_hist)
plt.show()

In [None]:
import os
import torch
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image

# Define the path to your test images
test_data_path = "/kaggle/input/faltu-dataset/Project 1 Data/Test_Data"


# Custom dataset class for loading images from a single folder
class CustomImageDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_files = [f for f in os.listdir(root_dir) if f.endswith(('jpg', 'jpeg', 'png'))]

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.image_files[idx])
        image = Image.open(img_name).convert('RGB')  # Ensure image is in RGB format
        if self.transform:
            image = self.transform(image)
        return image

# Create an instance of the custom dataset
test_dataset = CustomImageDataset(root_dir=test_data_path, transform=val_transform)

# Create DataLoader for the test dataset
test1_loader = DataLoader(
    test_dataset,
    batch_size=32,
    num_workers=4,
    shuffle=False  # No need to shuffle test data
)

# Example of making predictions
# Load your trained model
# model = YourModel()
# model.load_state_dict(torch.load('your_model.pth'))
# model.eval()  # Set the model to evaluation mode
model=resnet50
# Device configuration
#device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# model.to(device)

# Make predictions
predictions = []
with torch.no_grad():  # Disable gradient calculation
    for images in test1_loader:
        images = images.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        predictions.extend(predicted.cpu().numpy())

print(predictions)
