In [72]:
import numpy as np
import cv2
# import matplotlib.pyplot as plt
import math

import os
import multiprocessing as mp
from tqdm import tqdm
# import time

# For Perceptron
from sklearn.metrics import log_loss
from sklearn.preprocessing import OneHotEncoder


In [73]:
with open("./train.txt") as f:
    train_file_list = f.readlines()
with open("./val.txt") as f:
    val_file_list = f.readlines()
with open("./test.txt") as f:
    test_file_list = f.readlines()

train_file_list = [x.strip().split(sep=" ") for x in train_file_list]
val_file_list = [x.strip().split(sep=" ") for x in val_file_list]
test_file_list = [x.strip().split(sep=" ") for x in test_file_list]

train_label = [int(x[1]) for x in train_file_list]
val_label = [int(x[1]) for x in val_file_list]
test_label = [int(x[1]) for x in test_file_list]

train_file_list = [x[0] for x in train_file_list]
val_file_list = [x[0] for x in val_file_list]
test_file_list = [x[0] for x in test_file_list]

# print("# cpus: ", os.cpu_count())
NUM_PROCESSES = 8

def ReadImage(filePath):
    image = cv2.imread(filePath, cv2.IMREAD_COLOR)
    # image = cv2.imread(filePath, cv2.IMREAD_GRAYSCALE)
    # image = cv2.resize(image, (256, 256))
    return image

with mp.Pool(processes=NUM_PROCESSES) as pool:
    train_imgs = pool.map(ReadImage, tqdm(train_file_list))
    val_imgs = pool.map(ReadImage, tqdm(val_file_list))
    test_imgs = pool.map(ReadImage, tqdm(test_file_list))

# resize the images to 256x256
def ResizeImage(image):
    # resized_img = cv2.resize(image, (256, 256))
    resized_img = cv2.resize(image, (64, 64))
    return resized_img

with mp.Pool(processes=NUM_PROCESSES) as pool:
    resized_train_imgs = pool.map(ResizeImage, tqdm(train_imgs))
    resized_val_imgs = pool.map(ResizeImage, tqdm(val_imgs))
    resized_test_imgs = pool.map(ResizeImage, tqdm(test_imgs))


# Ref.: https://github.com/Ixiaohuihuihui/Extract-color-histogram-feature/blob/master/rgb_feature.py
# extract rgb features
def ExtractColorHistFeatures(image):
    features = []
    for channel in range(3):
        hist = cv2.calcHist(images=[image], channels=[channel], mask=None, histSize=[64], ranges=[0,256])
        hist = cv2.normalize(hist, hist)
        # features.extend(hist)
        features.append(hist)
    return features

with mp.Pool(processes=NUM_PROCESSES) as pool:
    ### tqdm returns an iterator
    # train_features = pool.map(ExtractFeatures, tqdm(resized_train_imgs))
    # val_features = pool.map(ExtractFeatures, tqdm(resized_val_imgs))
    # test_features = pool.map(ExtractFeatures, tqdm(resized_test_imgs))
    train_features = list(tqdm(pool.imap(ExtractColorHistFeatures, resized_train_imgs), total=len(resized_train_imgs)))
    val_features = list(tqdm(pool.imap(ExtractColorHistFeatures, resized_val_imgs), total=len(resized_val_imgs)))
    test_features = list(tqdm(pool.imap(ExtractColorHistFeatures, resized_test_imgs), total=len(resized_test_imgs)))


# flatten and reshape the features into (n_samples, n_features)
train_features = np.array(train_features)
val_features = np.array(val_features)
test_features = np.array(test_features)

def FlattenFeatures(feature):
    return feature.flatten()

with mp.Pool(processes=NUM_PROCESSES) as pool:
    train_features = np.array(pool.map(FlattenFeatures, tqdm(train_features)))
    val_features = np.array(pool.map(FlattenFeatures, tqdm(val_features)))
    test_features = np.array(pool.map(FlattenFeatures, tqdm(test_features)))

# # flatten and reshape the features into (n_samples, n_features)
# resized_train_imgs = np.array(resized_train_imgs)
# resized_val_imgs = np.array(resized_val_imgs)
# resized_test_imgs = np.array(resized_test_imgs)

# def FlattenImages(image):
#     return image.flatten()

# with mp.Pool(processes=NUM_PROCESSES) as pool:
#     resized_train_imgs = np.array(pool.map(FlattenImages, tqdm(resized_train_imgs)))
#     resized_val_imgs = np.array(pool.map(FlattenImages, tqdm(resized_val_imgs)))
#     resized_test_imgs = np.array(pool.map(FlattenImages, tqdm(resized_test_imgs)))

100%|██████████| 63325/63325 [00:58<00:00, 1078.67it/s]  
100%|██████████| 450/450 [00:00<00:00, 201112.07it/s]
100%|██████████| 450/450 [00:00<00:00, 222260.57it/s]
100%|██████████| 63325/63325 [01:06<00:00, 946.40it/s] 
100%|██████████| 450/450 [00:00<00:00, 1254.42it/s]
100%|██████████| 450/450 [00:00<00:00, 1476.69it/s]
100%|██████████| 63325/63325 [00:23<00:00, 2663.26it/s]
100%|██████████| 450/450 [00:00<00:00, 2418.60it/s]
100%|██████████| 450/450 [00:00<00:00, 2674.58it/s]
100%|██████████| 63325/63325 [00:00<00:00, 65869.41it/s]
100%|██████████| 450/450 [00:00<00:00, 28939.10it/s]
100%|██████████| 450/450 [00:00<00:00, 30556.87it/s]


In [74]:
def onehot(label, n_classes):
    enc = np.zeros(shape=(len(label), n_classes))
    for idx, val in enumerate(label):
        enc[idx, val] = 1
    return enc

train_label = onehot(np.array(train_label), 50)
val_label = onehot(np.array(val_label), 50)
test_label = onehot(np.array(test_label), 50)

In [75]:
# # normalize the input of perceptron
# normalized_resized_train_imgs = resized_train_imgs / 255
# normalized_resized_val_imgs = resized_val_imgs / 255
# normalized_resized_test_imgs = resized_test_imgs / 255

In [76]:
class DataLoader:
    def __init__(self, data, labels, batch_size=32, shuffle=True):
        self.data = data
        self.labels = labels
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.num_samples = data.shape[0]
        self.num_batches = int(np.ceil(self.num_samples / self.batch_size))
        self.indices = np.arange(self.num_samples)
        self.current_batch = 0
        
        if self.shuffle:
            np.random.shuffle(self.indices)
        
    def __iter__(self):
        return self
    
    def __next__(self):
        if self.current_batch >= self.num_batches:
            self.current_batch = 0
            if self.shuffle:
                np.random.shuffle(self.indices)
            raise StopIteration
            
        batch_indices = self.indices[self.current_batch*self.batch_size : (self.current_batch+1)*self.batch_size]
        batch_data = self.data[batch_indices]
        batch_labels = self.labels[batch_indices]
        
        self.current_batch += 1
        
        return batch_data, batch_labels
    


train_dataloader = DataLoader(train_features, train_label, shuffle=True)
val_dataloader = DataLoader(val_features, val_label, shuffle=True)
test_dataloader = DataLoader(test_features, test_label, shuffle=True)

In [77]:
class TwoLayerPerceptron:
    def __init__(self, input_size, hidden_size, output_size):
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        
        np.random.seed(42)
        self.limit = 1 / math.sqrt(input_size)
        # self.weights1 = np.random.randn(self.input_size, self.hidden_size)
        self.weights1 = np.random.uniform(low=-self.limit, high=self.limit, size=(self.input_size, self.hidden_size))
        self.biases1 = np.zeros((1, self.hidden_size))
        # self.weights2 = np.random.randn(self.hidden_size, self.output_size)
        self.weights2 = np.random.uniform(low=-self.limit, high=self.limit, size=(self.hidden_size, self.output_size))
        self.biases2 = np.zeros((1, self.output_size))
        
    def forward(self, X):
        # Layer 1
        self.z1 = np.dot(X, self.weights1) + self.biases1
        self.a1 = np.exp(self.z1) / np.sum(np.exp(self.z1), axis=1, keepdims=True)
        
        # Layer 2
        self.z2 = np.dot(self.a1, self.weights2) + self.biases2
        self.a2 = np.exp(self.z2) / np.sum(np.exp(self.z2), axis=1, keepdims=True)
        
        return self.a2
    
    def backward(self, X, y, learning_rate):
        # Compute error
        delta3 = self.a2 - y
        delta2 = np.dot(delta3, self.weights2.T) * (self.a1 * (1 - self.a1))
        
        # Compute gradients
        d_weights2 = np.dot(self.a1.T, delta3)
        d_biases2 = np.sum(delta3, axis=0, keepdims=True)
        d_weights1 = np.dot(X.T, delta2)
        d_biases1 = np.sum(delta2, axis=0)
        
        # Update weights and biases
        self.weights2 -= learning_rate * d_weights2
        self.biases2 -= learning_rate * d_biases2
        self.weights1 -= learning_rate * d_weights1
        self.biases1 -= learning_rate * d_biases1
        
    def train(self, dataloader, learning_rate, epochs):
        for epoch in range(epochs):
            for X, y in dataloader:
                # Forward pass
                output = self.forward(X)
                
                # Backward pass
                self.backward(X, y, learning_rate)
                
                # Compute loss
                loss = np.mean(-np.sum(y * np.log(output), axis=1))
                
            # Print progress
            if epoch % 1 == 0:
                print(f"Epoch {epoch + 1}, Train Loss {loss:.4f}")
    
    def predict(self, X):
        # Forward pass
        output = self.forward(X)
        
        # Return predicted class
        return np.argmax(output, axis=1)


In [83]:
perceptron = TwoLayerPerceptron(input_size=train_features.shape[1],
                                hidden_size=32,
                                output_size=50)

# Train model
perceptron.train(dataloader=train_dataloader, learning_rate=1e-3, epochs=30)

Epoch 1, Train Loss 3.9048
Epoch 2, Train Loss 3.9259
Epoch 3, Train Loss 3.9019
Epoch 4, Train Loss 3.9008
Epoch 5, Train Loss 3.9080
Epoch 6, Train Loss 3.8994
Epoch 7, Train Loss 3.9112
Epoch 8, Train Loss 3.9075
Epoch 9, Train Loss 3.9000
Epoch 10, Train Loss 3.9007
Epoch 11, Train Loss 3.9060
Epoch 12, Train Loss 3.9196
Epoch 13, Train Loss 3.9068
Epoch 14, Train Loss 3.9061
Epoch 15, Train Loss 3.9014
Epoch 16, Train Loss 3.9168
Epoch 17, Train Loss 3.9061
Epoch 18, Train Loss 3.9000
Epoch 19, Train Loss 3.9301
Epoch 20, Train Loss 3.9454
Epoch 21, Train Loss 3.9016
Epoch 22, Train Loss 3.9008
Epoch 23, Train Loss 3.9001
Epoch 24, Train Loss 3.9021
Epoch 25, Train Loss 3.9017
Epoch 26, Train Loss 3.9144
Epoch 27, Train Loss 3.9181
Epoch 28, Train Loss 3.9003
Epoch 29, Train Loss 3.8949
Epoch 30, Train Loss 3.9205


In [84]:
# Evaluate model
y_pred = perceptron.predict(test_features)
accuracy = np.mean(y_pred == np.argmax(test_label, axis=1))
print(f"Accuracy: {accuracy:.4f}")

Accuracy: 0.0311
