# Model: Convolutional Neural Network (CNN)


In [1]:
import os
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
from torchvision.utils import make_grid
import matplotlib.pyplot as plt
import shutil
from PIL import Image  # Import the Image module from PIL



  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# 1 - Data Loading and Preprocessing:
# Set up data loaders to load and preprocess your image data:

# Define data directories
train_data_dir = 'Dataset/train'
test_data_dir = 'Dataset/test'

# Define image transformations
transform = transforms.Compose([
    transforms.Resize((96, 96)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Create datasets
train_dataset = ImageFolder(root=train_data_dir, transform=transform)
test_dataset = ImageFolder(root=test_data_dir, transform=transform)

# Create data loaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

# Define the number of classes
num_classes = len(train_dataset.classes)


# CNN Model

In [4]:
# 2 - Define the CNN Model:


import torch.nn as nn

class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(32 * 24 * 24, 128)  # Updated input size for 96x96 images
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))

        # Calculate the size of x after the last pooling layer and update fc1 input size accordingly
        x_size = x.size(0)
        x = x.view(x_size, -1)  # Reshape to flatten

        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x



# Initialize the model
model = CNN()



In [5]:
#Define Loss and Optimizer:
#Set up the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 3 - Training the Model:
# Train the model using your training data:

# Set device (GPU if available, otherwise CPU)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)

# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    running_loss = 0.0
    for i, data in enumerate(train_loader, 0):
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f"Epoch {epoch + 1}, Loss: {running_loss / len(train_loader)}")

print("Training finished.")


Epoch 1, Loss: 1.749460876367654
Epoch 2, Loss: 1.2306226560264637
Epoch 3, Loss: 0.899979644899915
Epoch 4, Loss: 0.5591124827694741
Epoch 5, Loss: 0.2586393271377132
Epoch 6, Loss: 0.10127061706890536
Epoch 7, Loss: 0.043396361508839736
Epoch 8, Loss: 0.027331266019813668
Epoch 9, Loss: 0.030167767076581052
Epoch 10, Loss: 0.012630276117254357
Training finished.


In [6]:
# 4 - Define Loss and Optimizer:
#Set up the loss function and optimizer#

correct = 0
total = 0
with torch.no_grad():
    for data in test_loader:
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f"Accuracy on test data: {100 * correct / total:.2f}%")


Accuracy on test data: 54.81%


# ResNet-50 model

In [3]:
# Load a pre-trained ResNet-50 model
model = torchvision.models.resnet50(pretrained=True)

# Freeze all layers except the final classification layer
for param in model.parameters():
    param.requires_grad = False
model.fc.requires_grad = True


# Modify the final classification layer to match your number of classes
num_classes = len(train_dataset.classes)
model.fc = nn.Linear(2048, num_classes)  # 2048 is the output size of ResNet-50


# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.fc.parameters(), lr=0.001)

# Train the model
num_epochs = 10 #20 - Loss: 0.2588 -acc:83.78%
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    print(f"Epoch {epoch + 1}, Loss: {running_loss / len(train_loader)}")

print("Training finished.")


Epoch 1, Loss: 0.8246983416900513
Epoch 2, Loss: 0.5164229089667083
Epoch 3, Loss: 0.4584004627481388
Epoch 4, Loss: 0.4451314907544737
Epoch 5, Loss: 0.40161887765120546
Epoch 6, Loss: 0.38537737152948504
Epoch 7, Loss: 0.37954091095620657
Epoch 8, Loss: 0.34064749841857106
Epoch 9, Loss: 0.32879225620228775
Epoch 10, Loss: 0.3347152703127284
Epoch 11, Loss: 0.31465372334050523
Epoch 12, Loss: 0.3141958956505842
Epoch 13, Loss: 0.3062680433178024
Epoch 14, Loss: 0.3026787608292452
Epoch 15, Loss: 0.2958309228775228
Epoch 16, Loss: 0.2972824718257424
Epoch 17, Loss: 0.27643585924035424
Epoch 18, Loss: 0.2809820374248514
Epoch 19, Loss: 0.25495578049664286
Epoch 20, Loss: 0.25881806537983526
Training finished.


In [4]:
# Evaluate the model on the test data
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = 100 * correct / total
print(f"Accuracy on test data: {accuracy:.2f}%")

if accuracy>84.15:
# Save the trained model to a file
    torch.save(model, 'Models/ResNet-50.model.pth')

Accuracy on test data: 83.78%


# Classify Unlabeled images

In [3]:
# Define a custom dataset class for loading unlabeled images
from torch.utils.data import Dataset, DataLoader

class CustomUnlabeledDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.image_files = [f for f in os.listdir(data_dir) if f.endswith('.png')]

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_name = os.path.join(self.data_dir, self.image_files[idx])
        image = Image.open(img_name).convert('RGB')

        if self.transform:
            image = self.transform(image)

        return image

    def get_image_paths(self,batch_size):
        # Returns a list of image file paths in the dataset
        return [os.path.join(self.data_dir, f) for f in self.image_files]
    

    def get_image_paths_batch(self, start_idx, end_idx):
        # Returns a list of image file paths within the specified range [start_idx, end_idx]
        return [os.path.join(self.data_dir, self.image_files[i]) for i in range(start_idx, end_idx)]

In [4]:
import torch
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
import numpy as np


# Create datasets
unlabeled_data_dir = 'Dataset/unlabelled'

# Create the custom unlabeled dataset
custom_unlabeled_dataset = CustomUnlabeledDataset(unlabeled_data_dir, transform=transform)

# Create a data loader for the custom unlabeled dataset
batch_size = 32  # Adjust this batch size as needed
unlabeled_loader = DataLoader(custom_unlabeled_dataset, batch_size=batch_size, shuffle=False)#,num_workers=4)





In [5]:
# Load the trained model (replace 'model.pth' with the path to your trained model)
model = torch.load('Models/ResNet-50.model.pth')
# Set the model in evaluation mode
model.eval()


ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [6]:

# Check if folder existes else creates it
def create_folder_if_not_exists(folder_path):

    if not os.path.exists(folder_path):
        try:
            os.makedirs(folder_path)
            print(f"Folder '{folder_path}' created.")
        except OSError as e:
            print(f"Error creating folder '{folder_path}': {e}")

# Move images for the new folder path of the classification

def move_file(source_file_path,classifation):
    

    destination_folder_path="Dataset/Unlabelled_Predicted/"+str(classifation)+"/"
    
    #check folder if exists
    create_folder_if_not_exists(destination_folder_path)

    try:
        # Move the file to the destination folder
        shutil.move(source_file_path, destination_folder_path)
        #print(f"File moved from '{source_file_path}' to '{destination_folder_path}'")
    except Exception as e:
        print(f"Error moving the file: {e}")    


# iterate all list of images and for each image and label use the "move_file"

def save_classification(predictions,image_paths):
    for prediction, image_path in zip(predictions, image_paths):
        move_file(image_path,prediction)
    



In [9]:
def predict_category_of_images(unlabeled_loader,custom_unlabeled_dataset,number_of_batches=1):

    # Initialize lists to store predictions and image file paths
    predictions = []
    image_paths = []

    # Device
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

    # Iterate through the unlabeled data and make predictions
    count_batches=0
    with torch.no_grad():
        for batch_idx, inputs in enumerate(unlabeled_loader):  
            try:
                inputs = inputs.to(device)  # Move data to the device (CPU or GPU)
                outputs = model(inputs)
                _, predicted = torch.max(outputs.data, 1)
                
                predictions.extend(predicted.cpu().numpy())  # Store predictions
                
                # Get the image paths for the current batch and extend the image_paths list
                start_idx = batch_idx * batch_size
                end_idx = start_idx + len(inputs)
                batch_image_paths = custom_unlabeled_dataset.get_image_paths_batch(start_idx, end_idx)
                image_paths.extend(batch_image_paths)

                if count_batches==number_of_batches:
                    print(f"Number of batchs predicted'{number_of_batches}' and a total of  '{len(predictions)}' images")
                    save_classification(predictions,image_paths)
                    break
                else:
                    count_batches+=1
            except Exception as e:
                print(f"Error processing batch {batch_idx}: {e}")


# while there are files in the folder unlabeled_data_dir iterate batches
def run_folder():
    # Create datasets
    unlabeled_data_dir = 'Dataset/unlabelled'
    custom_unlabeled_dataset = CustomUnlabeledDataset(unlabeled_data_dir, transform=transform)
    while len(custom_unlabeled_dataset)!=0:
        # Create the custom unlabeled dataset\
        custom_unlabeled_dataset = CustomUnlabeledDataset(unlabeled_data_dir, transform=transform)
        print(f"Images left in the folder: '{len(custom_unlabeled_dataset)}'")
        # Create a data loader for the custom unlabeled dataset
        batch_size = 32  # Adjust this batch size as needed
        unlabeled_loader = DataLoader(custom_unlabeled_dataset, batch_size=batch_size, shuffle=False)#,num_workers=4)

        predict_category_of_images(unlabeled_loader,custom_unlabeled_dataset,1)



In [10]:
run_folder()

Images left in the folder: '64'
Number of batchs predicted'1' and a total of  '64' images
Images left in the folder: '0'
