In [1]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
from torchvision import models
from torchvision.models import resnet18, ResNet18_Weights
from torch.utils.data import DataLoader, Subset

import numpy as np
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score, confusion_matrix, precision_score, recall_score, f1_score
from sklearn.tree import DecisionTreeClassifier

In [2]:
#feature extraction indexing for classes
def select_class_index(labels, n):
    labels = np.array(labels)
    selected = []

    for c in range(10):
        idx = np.where(labels == c)[0][:n]
        selected.extend(idx)

    return selected

In [3]:
# load in cifar-10 dataset
train_set = torchvision.datasets.CIFAR10(root="F:/CIFAR10_Project/data", train=True, download=True)
test_set = torchvision.datasets.CIFAR10(root="F:/CIFAR10_Project/data", train=False, download=True)

# select the first 500 train images per class
train_indices = select_class_index(train_set.targets, 500)
train_subset = torch.utils.data.Subset(train_set, train_indices)

# select the first 100 test images
test_indices = select_class_index(test_set.targets, 100)
test_subset = torch.utils.data.Subset(test_set, test_indices)

print(len(train_subset))  # Should be 500 * 10 = 5000
print(len(test_subset))   # Should be 1000

print(len(train_indices))  # 5000
print(set([train_set.targets[i] for i in train_indices])) # check how many classes ( should be 0-9 )

5000
1000
5000
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}


In [4]:
#resize to 224x224x3
transform_resnet = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406], 
        std=[0.229, 0.224, 0.225]
    )
])

In [5]:
# use dataloaders to hold the resized images

train_subset.dataset.transform = transform_resnet
test_subset.dataset.transform = transform_resnet

train_loader = DataLoader(train_subset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_subset, batch_size=64, shuffle=False)

In [6]:
# load RestNet-18 and remove the last layer

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

weights = ResNet18_Weights.DEFAULT
resnet = resnet18(weights=weights)
resnet.fc = nn.Identity()
resnet = resnet.to(device)
resnet.eval()


ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [7]:
# resnet-18 feature extractor
#def extract_features(dataset):
   # loader = torch.utils.data.DataLoader(dataset, batch_size=64, shuffle=False)
def extract_features(dataset, batch_size=64):
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=False)

    feats = []
    labels = []

    with torch.no_grad():
        for images, lbls in loader:
            images = images.to(device)
            f = resnet(images)
            feats.append(f.cpu())
            labels.append(lbls)

    return torch.cat(feats), torch.cat(labels)

In [8]:
#train_features, train_labels = extract_features(train_subset.dataset)
#test_features, test_labels = extract_features(test_subset.dataset)
train_features, train_labels = extract_features(train_subset)
test_features, test_labels = extract_features(test_subset)

In [9]:
# TypeError: can't convert cuda:0 device type tensor to numpy. 
# Use Tensor.cpu() to copy the tensor to host memory first.

train_features_np = train_features.cpu().numpy()
test_features_np = test_features.cpu().numpy()

pca = PCA(n_components=50)
train_pca = pca.fit_transform(train_features_np)
test_pca  = pca.transform(test_features_np)

print(train_pca.shape, test_pca.shape)

(5000, 50) (1000, 50)


In [10]:
class BasicDecisionTree:
    def __init__(self, max_depth=50, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.tree = None

    def gini(self, y):
        classes, counts = np.unique(y, return_counts=True)
        prob = counts / counts.sum()
        return 1 - np.sum(prob ** 2)

    def split(self, X_column, threshold):
        left_index = np.where(X_column <= threshold)[0]
        right_index = np.where(X_column > threshold)[0]
        return left_index, right_index

    def best_split(self, X, y):
        m, n = X.shape
        if m <= 1:
            return None, None

        parent_gini = self.gini(y)
        best_gini = 1
        best_index, best_thr = None, None

        for feature_index in range(n):
            thresholds = np.unique(X[:, feature_index])
            for thr in thresholds:
                left_index, right_index = self.split(X[:, feature_index], thr)
                if len(left_index) == 0 or len(right_index) == 0:
                    continue

                gini_left = self.gini(y[left_index])
                gini_right = self.gini(y[right_index])
                weighted_gini = (len(left_index) * gini_left + len(right_index) * gini_right) / m

                if weighted_gini < best_gini:
                    best_gini = weighted_gini
                    best_index = feature_index
                    best_thr = thr

        #print(f"Best split: feature {best_index}, threshold {best_thr}, gini {best_gini}")
        if best_gini >= parent_gini:
            return None, None

        return best_index, best_thr

    def build_tree(self, X, y, depth=0):
        #num_samples_per_class = [np.sum(y == c) for c in np.unique(y)]
        #predicted_class = np.argmax(num_samples_per_class)
        unique_classes = np.unique(y)
        counts = [np.sum(y == c) for c in unique_classes]
        predicted_class = unique_classes[np.argmax(counts)]

        node = {
            'depth': depth,
            'num_samples': len(y),
            'predicted_class': predicted_class
        }

        if depth < self.max_depth and len(y) >= self.min_samples_split:
            feature_index, threshold = self.best_split(X, y)
            if feature_index is not None:
                #print(f"Depth {depth}: trying to split, samples: {len(y)}")
                left_index, right_index = self.split(X[:, feature_index], threshold)
                node['feature_index'] = feature_index
                node['threshold'] = threshold
                node['left'] = self.build_tree(X[left_index], y[left_index], depth + 1)
                node['right'] = self.build_tree(X[right_index], y[right_index], depth + 1)

                

        return node

    def fit(self, X, y):
        self.tree = self.build_tree(X, y)

    def predict_sample(self, node, x):
        if 'feature_index' not in node:
            return node['predicted_class']

        if x[node['feature_index']] <= node['threshold']:
            return self.predict_sample(node['left'], x)
        else:
            return self.predict_sample(node['right'], x)

    def predict(self, X):
        return np.array([self.predict_sample(self.tree, x) for x in X])

In [11]:
# training the model
tree = BasicDecisionTree(max_depth=50)
tree.fit(train_pca, train_labels.numpy())

In [20]:
# training small depth model
small_tree = BasicDecisionTree(max_depth=5)
small_tree.fit(train_pca, train_labels.numpy())

In [12]:
# test on test samples
tree_prediction = tree.predict(test_pca)

In [None]:
tree_prediction = small_tree.predict(test_pca)

In [14]:
print("train_pca shape:", train_pca.shape)
print("train_labels shape:", train_labels.shape)
print("Unique train labels:", np.unique(train_labels.numpy()))

train_pca shape: (5000, 50)
train_labels shape: torch.Size([5000])
Unique train labels: [0 1 2 3 4 5 6 7 8 9]


In [15]:
# Scikit's decision tree

dt_sklearn = DecisionTreeClassifier(max_depth=50, random_state=42)

X_train = train_pca
y_train = train_labels.numpy()
X_test = test_pca
y_test = test_labels.numpy()

# Train using features 
dt_sklearn.fit(train_pca, y_train)

# Predict on test features
y_pred_sklearn = dt_sklearn.predict(test_pca)

In [16]:
# save the model

import pickle

with open('basic_decision_tree.pkl', 'wb') as f:
    pickle.dump(tree, f)

In [21]:
# save the small tree model

import pickle

with open('depth5_decision_tree.pkl', 'wb') as f:
    pickle.dump(small_tree, f)

In [17]:
# to load the model

#with open('basic_decision_tree.pkl', 'rb') as f:
    #loaded_tree = pickle.load(f)

# Use loaded_tree to predict
#y_pred = loaded_tree.predict(test_pca)

In [22]:
print("Evaluation of 5-depth Decision Tree:")
print("on training set")
train_prediction = small_tree.predict(train_pca) # replace by loaded_tree.predict(...) for the saved model
evaluate_model(train_labels.numpy(), train_prediction)

print("on testing set")
tree_prediction = small_tree.predict(test_pca)
evaluate_model(test_labels.numpy(), tree_prediction)

Evaluation of 5-depth Decision Tree:
on training set
Accuracy: 0.6148
Precision: 0.6238588991336521
Recall : 0.6148
F1-score : 0.6093440825420627
Confusion Matrix:
 [[360  23   9   5   6   0   8  16  54  19]
 [ 10 404   1   1   1   0   5   4  12  62]
 [ 87   3 160  58  78  12  59  39   4   0]
 [ 24  16   2 264  13  70  84  13  13   1]
 [ 31   2  10  28 296  18  36  76   3   0]
 [ 10   4   9 182  13 217  21  39   5   0]
 [ 33   3  40  22  22   8 365   5   2   0]
 [ 16   6  11  74  48  22   7 296  20   0]
 [ 73  20   1   3   2   4  11  11 344  31]
 [  8  84   2   0   2   0   2   4  30 368]]
on testing set
Accuracy: 0.547
Precision: 0.5635160327395776
Recall : 0.547
F1-score : 0.5392929132145003
Confusion Matrix:
 [[66  6  1  0  0  1  4  3 15  4]
 [ 6 66  1  1  0  0  1  3  2 20]
 [21  0 22 18 12  4 19  4  0  0]
 [ 6  1  5 53  1 16 11  5  2  0]
 [ 5  0  1 12 49  4  9 19  1  0]
 [ 6  0  1 38  1 37  7  7  3  0]
 [ 8  0  3  3  2  4 79  0  1  0]
 [ 2  2  1 20 11 10  0 50  4  0]
 [22  2  1  1  

In [19]:
# i've already made an evaluation script from scratch for Naive bayes.
# I will now use the integrated evaluation functions from sklearn

def evaluate_model(y_true, y_pred):
    print("Accuracy:", accuracy_score(y_true, y_pred))
    print("Precision:", precision_score(y_true, y_pred, average='macro'))
    print("Recall :", recall_score(y_true, y_pred, average='macro'))
    print("F1-score :", f1_score(y_true, y_pred, average='macro'))
    print("Confusion Matrix:\n", confusion_matrix(y_true, y_pred))

print("Evaluation of Basic Decision Tree:")
print("on training set")
train_prediction = tree.predict(train_pca) # replace by loaded_tree.predict(...) for the saved model
evaluate_model(train_labels.numpy(), train_prediction)

print("on testing set")
tree_prediction = tree.predict(test_pca)
evaluate_model(test_labels.numpy(), tree_prediction)

      
print("\nEvaluation of Scikit's Decision Tree:")
print("on training set")
y_pred_train_sklearn = dt_sklearn.predict(X_train)
evaluate_model(y_train, y_pred_train_sklearn)
print("on testing set")
evaluate_model(y_test, y_pred_sklearn)

Evaluation of Basic Decision Tree:
on training set
Accuracy: 1.0
Precision: 1.0
Recall : 1.0
F1-score : 1.0
Confusion Matrix:
 [[500   0   0   0   0   0   0   0   0   0]
 [  0 500   0   0   0   0   0   0   0   0]
 [  0   0 500   0   0   0   0   0   0   0]
 [  0   0   0 500   0   0   0   0   0   0]
 [  0   0   0   0 500   0   0   0   0   0]
 [  0   0   0   0   0 500   0   0   0   0]
 [  0   0   0   0   0   0 500   0   0   0]
 [  0   0   0   0   0   0   0 500   0   0]
 [  0   0   0   0   0   0   0   0 500   0]
 [  0   0   0   0   0   0   0   0   0 500]]
on testing set
Accuracy: 0.599
Precision: 0.5999590094660118
Recall : 0.599
F1-score : 0.5979631970770243
Confusion Matrix:
 [[55  4 10  1  1  3  3  1 15  7]
 [ 4 76  2  1  0  0  0  0  3 14]
 [ 5  0 48 10  9  5 14  5  4  0]
 [ 1  1 13 42  6 21 11  2  1  2]
 [ 7  0 10  7 50  7  3 15  0  1]
 [ 0  0  8 15  7 61  4  3  1  1]
 [ 3  0  5  9  4  5 73  0  1  0]
 [ 1  0  2 10 17 15  0 53  1  1]
 [20  5  1  0  0  0  1  2 63  8]
 [ 4  8  0  2  0  0 