# 1. Build your own convolutional neural network using pytorch

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
from PIL import Image
import os

import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torch.utils.data import random_split

import torch.nn as nn
import torch.nn.functional as F
from torch import optim

import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
from torch.utils.data import ConcatDataset


device = torch.device('cuda')

In [2]:
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

transform = transforms.Compose([
    transforms.Resize((224, 224)),  # resize image
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

train_dataset = ImageFolder(root="./AugmentedTrain", transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

In [3]:
class TestDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.samples = []
        
        for filename in os.listdir(root_dir):
            if filename.endswith('.jpg'):
                img_path = os.path.join(root_dir, filename)
                self.samples.append((img_path, filename))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img_path, filename = self.samples[idx]
        image = Image.open(img_path)
        if self.transform:
            image = self.transform(image)
        return image, filename


transform = transforms.Compose([
    transforms.Resize((224, 224)),  # resize image
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

In [None]:
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size

train_subset, val_subset = random_split(train_dataset, [train_size, val_size])


train_loader = DataLoader(train_subset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_subset, batch_size=4, shuffle=False)

test_dataset = TestDataset(root_dir="./Test/Test/", transform=transform)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False)

In [8]:
class ConvNet(nn.Module):
    def __init__(self, num_classes=4):
        super(ConvNet, self).__init__()
        
        self.conv_block = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=2),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
            
            nn.Conv2d(64, 128, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(128),
            nn.ReLU(),                   
            nn.MaxPool2d(kernel_size=3, stride=2),
            
            nn.Conv2d(128, 256, kernel_size=5, stride=2, padding=2),
            nn.BatchNorm2d(256),
            nn.ReLU(),           

            nn.Conv2d(256, 384, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(384),
            nn.ReLU(),  

            nn.Conv2d(384, 512, kernel_size=5, stride=2, padding=2),
            nn.BatchNorm2d(512),
            nn.ReLU(),  

            nn.Conv2d(512, 1024, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(1024),
            nn.ReLU(),   
            
            nn.Conv2d(1024, 2048, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(2048),
            nn.ReLU(),  
            
            nn.Conv2d(2048, 512, kernel_size=5, stride=2, padding=2),
            nn.BatchNorm2d(512),
            nn.ReLU(),  
        )
        self.fc_block = nn.Sequential(
            nn.Linear(25088, 1024),   # Adjust this input size based on the output of your last conv layer
            nn.BatchNorm1d(1024),
            nn.ReLU(),
            nn.Dropout(0.5),
            
            nn.Linear(1024, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(0.5),

            nn.Linear(512, num_classes)
        )
    def forward(self, x):
        x = self.conv_block(x)
        x = x.view(x.size(0), -1)  # Flatten
        x = self.fc_block(x)
        return F.log_softmax(x, dim=1)


# 2. Train your model using cow teat datasets (you may need to use  Google Colab (or Kaggle) with GPU to train your code) 

### (1) use torchvision.datasets.ImageFolder for the training dataset
### (2) use custom dataloader for test dataset (return image tensor and file name)

In [6]:
# 1. Initialize the Model, Loss, and Optimizer
model = ConvNet(num_classes=4).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Number of training epochs
num_epochs = 30

for epoch in range(num_epochs):
    correct_train = 0
    total_train = 0
    # 2. Training Loop
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        
        # Zero the parameter gradients
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(data)
        loss = criterion(outputs, target)
        
        _, predicted_train = outputs.max(1)
        correct_train += predicted_train.eq(target).sum().item()
        total_train += target.size(0)
        
        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        if (batch_idx + 1) % 100 == 0:
            print(f"Epoch [{epoch+1}/{num_epochs}], Step [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item():.4f}")

    train_accuracy = 100. * correct_train / total_train
    print(f"Epoch [{epoch+1}/{num_epochs}], Training Accuracy: {train_accuracy:.2f}%")
    
    model.eval()
    total_val_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in val_loader:
            data, target = data.to(device), target.to(device)
            outputs = model(data)
            loss = criterion(outputs, target)
            total_val_loss += loss.item()
            _, predicted = outputs.max(1)
            correct += predicted.eq(target).sum().item()

    avg_val_loss = total_val_loss / len(val_loader)
   
    val_accuracy = 100. * correct / len(val_subset)
    print(f"Epoch [{epoch+1}/{num_epochs}], Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%")


Epoch [1/30], Step [100/230], Loss: 1.1656
Epoch [1/30], Step [200/230], Loss: 0.9608
Epoch [1/30], Training Accuracy: 51.58%
Epoch [1/30], Validation Loss: 1.0533, Validation Accuracy: 51.30%
Epoch [2/30], Step [100/230], Loss: 1.3847
Epoch [2/30], Step [200/230], Loss: 0.8401
Epoch [2/30], Training Accuracy: 51.14%
Epoch [2/30], Validation Loss: 1.0071, Validation Accuracy: 63.91%
Epoch [3/30], Step [100/230], Loss: 0.5602
Epoch [3/30], Step [200/230], Loss: 0.8500
Epoch [3/30], Training Accuracy: 51.47%
Epoch [3/30], Validation Loss: 1.1005, Validation Accuracy: 55.22%
Epoch [4/30], Step [100/230], Loss: 1.1984
Epoch [4/30], Step [200/230], Loss: 0.5826
Epoch [4/30], Training Accuracy: 50.71%
Epoch [4/30], Validation Loss: 2.7109, Validation Accuracy: 43.91%
Epoch [5/30], Step [100/230], Loss: 0.7540
Epoch [5/30], Step [200/230], Loss: 0.7690
Epoch [5/30], Training Accuracy: 49.95%
Epoch [5/30], Validation Loss: 1.0394, Validation Accuracy: 43.04%
Epoch [6/30], Step [100/230], Loss:

In [4]:
class TensorLabelDataset(Dataset):
    def __init__(self, original_dataset):
        self.original_dataset = original_dataset

    def __len__(self):
        return len(self.original_dataset)

    def __getitem__(self, idx):
        data, label = self.original_dataset[idx]
        return data, torch.tensor(label, dtype=torch.long)

In [5]:
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size

train_subset, val_subset = random_split(train_dataset, [train_size, val_size])

# Wrap train_subset
tensor_label_train_subset = TensorLabelDataset(train_subset)
train_loader = DataLoader(tensor_label_train_subset, batch_size=4, shuffle=True, drop_last=True) # use tensor_label_train_subset here

val_loader = DataLoader(val_subset, batch_size=4, shuffle=False)

test_dataset = TestDataset(root_dir="./Test/Test/", transform=transform)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False)

## Pseudo labeling

In [12]:
class PseudoLabeledDataset(Dataset):
    def __init__(self, data_list):
        self.data_list = data_list

    def __len__(self):
        return len(self.data_list)

    def __getitem__(self, idx):
        data, label = self.data_list[idx]
        return data, torch.tensor(label, dtype=torch.long)  # Explicitly casting to tensor

# Initialize the Model, Loss, and Optimizer
model = ConvNet(num_classes=4).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Number of training epochs
num_epochs = 30

# Pseudo-labeling threshold
threshold = 0.95

for epoch in range(num_epochs):
    correct_train = 0
    total_train = 0
    
    # Pseudo-labeling: get the pseudo-labels for the test data
    model.eval()
    pseudo_labels = []
    with torch.no_grad():
        for data, _ in test_loader:
            data = data.to(device)
            outputs = F.softmax(model(data), dim=1)
            _, predicted = outputs.max(1)
            # Only keep predictions with confidence above the threshold
            mask = outputs.max(1)[0] > threshold
            for i in range(len(mask)):
                if mask[i]:
                    pseudo_labels.append((data[i].cpu(), predicted[i].item()))  # Using integer here, but it's casted to tensor when getting item

    # Concatenate pseudo-labeled data with original training data
    pseudo_dataset = PseudoLabeledDataset(pseudo_labels)
    #combined_dataset = ConcatDataset([train_dataset, pseudo_dataset])
    combined_dataset = ConcatDataset([tensor_label_train_subset, pseudo_dataset])

    combined_loader = DataLoader(combined_dataset, batch_size=32, shuffle=True)

    # Training Loop with pseudo-labeled data
    model.train()
    for batch_idx, (data, target) in enumerate(combined_loader):
        data, target = data.to(device), target.to(device)
        
        # Zero the parameter gradients
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(data)
        loss = criterion(outputs, target.squeeze())  # Use squeeze in case of any singleton dimensions
        
        _, predicted_train = outputs.max(1)
        correct_train += predicted_train.eq(target).sum().item()
        total_train += target.size(0)
        
        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        if (batch_idx + 1) % 100 == 0:
            print(f"Epoch [{epoch+1}/{num_epochs}], Step [{batch_idx+1}/{len(combined_loader)}], Loss: {loss.item():.4f}")

    train_accuracy = 100. * correct_train / total_train
    print(f"Epoch [{epoch+1}/{num_epochs}], Training Accuracy: {train_accuracy:.2f}%")
    
    # Validation Loop
    model.eval()
    total_val_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in val_loader:
            data, target = data.to(device), target.to(device)
            outputs = model(data)
            loss = criterion(outputs, target)
            total_val_loss += loss.item()
            _, predicted = outputs.max(1)
            correct += predicted.eq(target).sum().item()

    avg_val_loss = total_val_loss / len(val_loader)
   
    val_accuracy = 100. * correct / len(val_subset)
    print(f"Epoch [{epoch+1}/{num_epochs}], Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%")


Epoch [1/30], Training Accuracy: 53.75%
Epoch [1/30], Validation Loss: 1.0243, Validation Accuracy: 51.52%
Epoch [2/30], Training Accuracy: 57.29%
Epoch [2/30], Validation Loss: 0.9903, Validation Accuracy: 59.78%
Epoch [3/30], Training Accuracy: 57.48%
Epoch [3/30], Validation Loss: 1.0893, Validation Accuracy: 60.22%
Epoch [4/30], Training Accuracy: 57.65%
Epoch [4/30], Validation Loss: 1.0196, Validation Accuracy: 57.17%
Epoch [5/30], Training Accuracy: 58.04%
Epoch [5/30], Validation Loss: 1.0189, Validation Accuracy: 57.17%
Epoch [6/30], Training Accuracy: 58.77%
Epoch [6/30], Validation Loss: 0.9970, Validation Accuracy: 57.39%
Epoch [7/30], Training Accuracy: 57.62%
Epoch [7/30], Validation Loss: 1.0772, Validation Accuracy: 57.83%
Epoch [8/30], Training Accuracy: 58.37%
Epoch [8/30], Validation Loss: 0.9723, Validation Accuracy: 59.57%
Epoch [9/30], Training Accuracy: 59.31%
Epoch [9/30], Validation Loss: 1.0242, Validation Accuracy: 52.83%
Epoch [10/30], Training Accuracy: 58.

# 3. Evaluate your model using the developed software

In [13]:
device = torch.device("cuda:0")
model.to(device)
model.eval()  

filenames = []
predictions = []

with torch.no_grad():  
    for data, filename in test_loader:  
        data = data.to(device)
        outputs = model(data)
        _, predicted = torch.max(outputs, 1)  # Find the class index with the maximum value for each sample
        filenames.extend(filename)
        predictions.extend(predicted.cpu().numpy().tolist())

model.train()  # Set the model back to training mode
import pandas as pd

df_output = pd.DataFrame({
    'Filename': filenames,
    'Prediction': predictions 
})

df_output.to_csv('conv_predictions.csv', index=False, header=False) 

# 4. Compare results with [SCTL paper](https://www.mdpi.com/2076-2615/12/7/886/htm). Requirement: performance is better than VGG16: 66.8%

# 5. Write a four-page paper report using the shared LaTex template. Upload your paper to ResearchGate or Arxiv, and put your paper link here.

# 6. Grading rubric

(1). Code ------- 20 points (you also need to upload your final model as a pt file)

(2). Grammer ---- 20 points

(3). Introduction & related work --- 10 points


(4). Method  ---- 20 points

(5). Results ---- 20 points

     > = 66.8% -->10 points
     < 40 % -->0 points
     >= 40 % & < 66.8% --> 0.3731 point/percent
     

(6). Discussion - 10 points