In [15]:
import os
from PIL import Image
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch
import torchvision.transforms as transforms
import numpy as np
from torchvision.io import read_image
from torch import nn
from tqdm import tqdm
import time
import random
from torch.utils.data import DataLoader, Subset
import csv
from torch.utils.data import Dataset


In [16]:
class CustomImageDataset(Dataset):
    def __init__(self, data_folder, transform=None):
        self.data_folder = data_folder
        self.image_files = [f for f in os.listdir(data_folder) if f.endswith(".png")]
        self.transform = transform

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        image_path = os.path.join(self.data_folder, self.image_files[idx])
        image = read_image(image_path)
        image = image[0:3].float()
        # image = image.reshape(1, 128, 128)
        
        txt_file = os.path.splitext(self.image_files[idx])[0] + ".txt"
        txt_path = os.path.join(self.data_folder, txt_file)
        with open(txt_path, "r") as f:
            first_line = f.readline().strip()
            class_label = int(first_line.split()[0])

        if self.transform:
            image = self.transform(image)

        return image, class_label
    

class CustomImageDataset_test(Dataset):
    def __init__(self, data_folder, transform=None):
        self.data_folder = data_folder
        self.image_files = [f for f in os.listdir(data_folder) if f.endswith(".png")]
        self.transform = transform

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        image_path = os.path.join(self.data_folder, self.image_files[idx])
        image_name = self.image_files[idx]
        image = read_image(image_path)
        image = image[0:3].float()

        if self.transform:
            image = self.transform(image)

        return image, image_name

In [17]:

def get_dataloaders(data_folder, transform, train_ratio, val_ratio, batch_size, shuffle_flag=True, train_data=True):
    if train_data:
        # Create a single merged dataset
        train_dataset = CustomImageDataset(data_folder, transform)
        val_dataset = CustomImageDataset(data_folder, transform)
        test_dataset = CustomImageDataset(data_folder, transform)
    else:
        train_dataset = CustomImageDataset_test(data_folder, transform)
        val_dataset = CustomImageDataset_test(data_folder, transform)
        test_dataset = CustomImageDataset_test(data_folder, transform)
    # obtain training indices that will be used for validation
    num_train = len(test_dataset)
    indices = list(range(num_train))
    print("--------- INDEX checking ---------")
    print(f"Original: {indices[:5]}")
    if shuffle_flag:
        random.shuffle(indices)
    print(f"Shuffled: {indices[:5]}")
    print("--------- INDEX shuffled ---------\n")

    split_train = int(np.floor(train_ratio * num_train))
    split_val = split_train + int(np.floor(val_ratio * (num_train-split_train)))
    train_idx, val_idx, test_idx = indices[0:split_train], indices[split_train:split_val], indices[split_val:]
    merge_dataset = Subset(train_dataset, train_idx)

    train_loader = DataLoader(merge_dataset, batch_size=batch_size)
    val_loader = DataLoader(Subset(val_dataset, val_idx), batch_size=batch_size)
    test_loader = DataLoader(Subset(test_dataset, test_idx), batch_size=batch_size)
    
    # check dataset
    print(f"Total number of samples: {num_train} datapoints")
    print(f"Number of train samples: {len(train_loader)} batches/ {len(train_loader.dataset)} datapoints")
    print(f"Number of val samples: {len(val_loader)} batches/ {len(val_loader.dataset)} datapoints")
    print(f"Number of test samples: {len(test_loader)} batches/ {len(test_loader.dataset)} datapoints")
    print(f"Data Transform: {transform}")
    print(f"")
    
    dataloaders = {
        "train": train_loader,
        "val": val_loader,
        "test": test_loader,
    }
    return dataloaders

In [18]:
def count_parameters(model):
    total_num = 0
    for parameter in model.parameters():
        if parameter.requires_grad:
            total_num += parameter.numel() 
    return total_num


In [19]:
def majority_voting(output_list):
    stacked_outputs = torch.stack(output_list)
    avg_probs = torch.mean(stacked_outputs, dim=0)
    
    return avg_probs

In [33]:
def eval(model_list, data_loader):
    for model in model_list:
        model.cuda()
        model.eval()
    criterion = nn.CrossEntropyLoss()
    start = time.time()
    num_class = 10
    running_loss = 0.0
    correct_predictions = 0
    correct_top3_predictions = 0
    total_samples = 0
    confus = torch.zeros(num_class, num_class,dtype=int)            
    for images, labels in tqdm(data_loader): # Iterate over data.
        images, labels = images.cuda(), labels.cuda()
        model_outputs = []
        for model in model_list:
            with torch.no_grad():
                outputs = model(images)
                model_outputs.append(outputs)
        vote_outputs = majority_voting(model_outputs)
        loss = criterion(vote_outputs, labels)
        running_loss += loss.item()
        _, predicted = torch.max(vote_outputs, 1)
        correct_predictions += (predicted == labels).sum().item()
        
        _, top3_preds = vote_outputs.topk(3, 1, True, True)
        correct_top3_predictions += sum([labels[i] in top3_preds[i] for i in range(labels.size(0))])

        total_samples += labels.size(0)
        for ii in range(len(predicted)):
            confus[ labels.data[ii] ][ predicted[ii] ]+=1

    avg_loss = running_loss / total_samples
    top1_accuracy = correct_predictions / total_samples * 100
    top3_accuracy = correct_top3_predictions / total_samples * 100
    print(f"samples: {total_samples}, Loss: {avg_loss:.4f}, Top-1 Accuracy: {top1_accuracy:.2f}%, Top-3 Accuracy: {top3_accuracy:.2f}%")
    for ii in range(num_class) :
        print(f"class {ii}:{confus.numpy()[ii]}")
        # pprint(confus)

    end = time.time()
    duration = end - start
    print(f"Elapsed time: {duration} seconds")

# Main Program

In [34]:
path = "D:\Casper\OTHER\Data\MNIST2\HW2_MNIST_train"
dir_list = os.listdir(path)

transform = transforms.Compose([])
data_folder = path
custom_dataset = CustomImageDataset(data_folder, transform)

# HW2-1-1

In [35]:
loaders2_1_1 = get_dataloaders(data_folder, transform, 0, 1, 64)

path = "D:\\Casper\\aML\\HW2\\"
path2 = "D:\\Casper\\aML\\HW2\\aML\\"
path = "D:\\Casper\\OTHER\\Weight\\aML\\"
model_list = [
    torch.jit.load(f'{path}resnet152_hw1_99.87268470984435.pt'),
    torch.jit.load(f'{path}resnet152_hw1_99.7576902542199.pt'),
    torch.jit.load(f'{path}resnet152_hw1_99.8398291510945.pt'),
    torch.jit.load(f'{path}resnet152_hw1_99.79362602160253.pt'),
    torch.jit.load(f'{path}resnet152_hw1_99.81621421824305.pt'),
    
    torch.jit.load(f'{path}resnet152.pt'),
    torch.jit.load(f'{path}resnet101.pt'),
    torch.jit.load(f'{path}resnet50.pt'),
    torch.jit.load(f'{path}resnet18.pt'),
]

eval(model_list, loaders2_1_1['val'])

--------- INDEX checking ---------
Original: [0, 1, 2, 3, 4]
Shuffled: [41016, 92222, 13339, 16125, 7390]
--------- INDEX shuffled ---------

Total number of samples: 97396 datapoints
Number of train samples: 0 batches/ 0 datapoints
Number of val samples: 1522 batches/ 97396 datapoints
Number of test samples: 0 batches/ 0 datapoints
Data Transform: Compose(
)



100%|██████████| 1522/1522 [06:59<00:00,  3.63it/s]

samples: 97396, Loss: 0.0000, Top-1 Accuracy: 99.96%, Top-3 Accuracy: 100.00%
class 0:[10785     0     0     0     0     0     0     0     0     0]
class 1:[    0 10830     0     0     0     0     0     5     0     0]
class 2:[   0    0 9529    0    0    0    0    2    0    0]
class 3:[   0    0    0 9810    0    1    0    1    0    0]
class 4:[   0    0    0    0 9311    0    0    1    0    2]
class 5:[   2    0    0    3    0 8680    1    0    1    0]
class 6:[   0    0    0    0    2    0 9472    0    0    0]
class 7:[    0     0     0     0     0     0     0 10007     0     0]
class 8:[   0    0    1    1    0    2    0    0 9422    1]
class 9:[   0    0    0    0    9    1    0    5    0 9509]
Elapsed time: 419.2192018032074 seconds





# HW2-1-2

In [68]:

loader2_1_2 = get_dataloaders(data_folder, transform, 1, 0.5, 1)
model2_1_2 = torch.jit.load(f'D:\\Casper\\aML\\HW2\\OneParaModel.pt')

def evaluation(model, data_loader):
    correct = 0
    total = 0
    num_of_para = count_parameters(model)
    print(f"Number of parameter: {num_of_para}")
    model.eval()
    with torch.no_grad():
        for inputs, labels in tqdm(data_loader):
            outputs = model(inputs)
            total += labels.size(0)
            correct += (outputs == labels).sum().item()
    accuracy = correct / total
    print(f"Total accuracy: {accuracy*100:.2f}%")
    print(f"score = {accuracy*100/num_of_para}")
evaluation(model2_1_2, loader2_1_2['train'])

--------- INDEX checking ---------
Original: [0, 1, 2, 3, 4]
Shuffled: [57467, 32380, 11020, 93807, 59050]
--------- INDEX shuffled ---------

Total number of samples: 97396 datapoints
Number of train samples: 97396 batches/ 97396 datapoints
Number of val samples: 0 batches/ 0 datapoints
Number of test samples: 0 batches/ 0 datapoints
Data Transform: Compose(
)

Number of parameter: 1


100%|██████████| 97396/97396 [01:04<00:00, 1511.92it/s]

Total accuracy: 20.61%
score = 20.61275617068463





# HW2-1-3


In [71]:
def predict_images(data_loader, model, output_csv_path):
    results = []
    
    for model in model_list:
        model.cuda()
        model.eval()

    with torch.no_grad():
        for inputs, image_name in tqdm(data_loader):
            inputs = inputs.cuda()
            model_outputs = []
            for model in model_list:
                with torch.no_grad():
                    outputs = model(inputs)
                    model_outputs.append(outputs)
            vote_outputs = majority_voting(model_outputs)
            _, predicted = torch.max(vote_outputs, 1)
            results.append([image_name[0], predicted.item()])
            # break


    with open(output_csv_path, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['image', 'class'])  # Write the header
        writer.writerows(results)

folder_path = 'D:\\Casper\\OTHER\\Data\\MNIST2\\HW2_MNIST_test'
output_csv_path = 'predictions.csv'

path = "D:\\Casper\\aML\\HW2\\"
path2 = "D:\\Casper\\aML\\HW2\\aML\\"
model_list = [
    torch.jit.load(f'{path}resnet152.pt'),
    torch.jit.load(f'{path}resnet101.pt'),
    torch.jit.load(f'{path}resnet50.pt'),
    torch.jit.load(f'{path}resnet18.pt'),

    torch.jit.load(f'{path2}R6_btnk_ch8(hw2).pt'),
    torch.jit.load(f'{path2}R6_btnk_8ch4n8.pt'),
]

custom_dataset = CustomImageDataset_test(folder_path, transform)
loader2_1_3 = get_dataloaders(folder_path, transform, 0, 1, 1, False, False)
predict_images(loader2_1_3['val'], model_list, output_csv_path)

--------- INDEX checking ---------
Original: [0, 1, 2, 3, 4]
Shuffled: [0, 1, 2, 3, 4]
--------- INDEX shuffled ---------

Total number of samples: 24350 datapoints
Number of train samples: 0 batches/ 0 datapoints
Number of val samples: 24350 batches/ 24350 datapoints
Number of test samples: 0 batches/ 0 datapoints
Data Transform: Compose(
)



100%|██████████| 24350/24350 [11:24<00:00, 35.55it/s]
