In [1]:
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
from scipy.special import softmax

In [2]:
device = torch.device('cuda:5' if torch.cuda.is_available() else 'cpu')
trainX = np.load("./data/train_X.npy")
trainY = np.load("./data/train_y.npy")
testX = np.load("./data/test_X.npy")
testY = np.load("./data/test_y.npy")

def partition_data():
    trainX_reshape = trainX.reshape(len(trainX),28*28)
    train_images, val_images, train_labels, val_labels = train_test_split(
    trainX_reshape, trainY, test_size=0.2, random_state=42)
    return [train_images, val_images, train_labels, val_labels]

[x_train, x_val, y_train, y_val] = partition_data()

In [3]:
class MLP2(nn.Module):
    def __init__(self, activation="relu"):
        super(MLP2, self).__init__()
        self.activation = activation
        self.layer1 = nn.Linear(28*28, 128)
        self.layer2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.layer1(x)
        if self.activation=="relu":
            x = torch.relu(x)
        elif self.activation=="sigmoid":
            x = torch.sigmoid(x)
        elif self.activation=="tanh":
            x = nn.Tanh(x)
        x = self.layer2(x)
        return x
    
def train_model(model, b=64, log_all=False):
    optimizer = optim.SGD(model.parameters(), lr=0.005, momentum=0.5)
    criterion = nn.CrossEntropyLoss()
    idxes = [i for i in range(len(y_train))]
    shuffled_list = random.sample(idxes, len(idxes))
    batch_idx = [shuffled_list[i:min(i+b,len(idxes))] for i in range(0, len(idxes), b)]
    all_loss = []
    all_f1 = []
    for e in range(300):
        total_correct = 0
        total_loss = []
        total_f1 = []
        for b in range(len(batch_idx)):
            optimizer.zero_grad()
            bid = batch_idx[b]
            batch_x = x_train[bid]
            batch_y = y_train[bid]
        
            inputs = torch.FloatTensor(batch_x).to(device)
            labels = torch.tensor(batch_y,dtype=torch.int64).to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            correct = (predicted == labels).sum().item()
            total_correct += correct
            labels_cpu = labels.cpu().numpy()
            predicted = predicted.cpu().numpy()
            f1 = f1_score(labels_cpu,predicted,average="macro")
            total_f1.append(f1)

            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss.append(loss.item())
            #return loss.item(), total_correct/len(self.y_val), [self.model.layer1.weight, self.model.layer2.weight]
        if e%50==0:
            print("Training Loss: {l}, Training F1: {f}".format(l=np.mean(total_loss),f=f1))
            #validate_model(model, log_view=True)
        all_loss.append(np.mean(total_loss))
        all_f1.append(np.mean(total_f1))
    val_loss, val_f1 = validate_model(model, log_view=False)
    if log_all:
        return all_loss, all_f1
    return val_loss, val_f1
    
def validate_model(model, log_view = False):
    criterion = nn.CrossEntropyLoss()
    with torch.no_grad():
        inputs = torch.FloatTensor(x_val).to(device)
        outputs = model(inputs) 
        labels = torch.tensor(y_val,dtype=torch.int64).to(device)
        val_loss = criterion(outputs, labels)
        _, predicted = torch.max(outputs, 1)
        total_correct = (predicted == labels).sum().item()
        labels = labels.cpu().numpy()
        predicted = predicted.cpu().numpy()
        f1 = f1_score(labels,predicted,average="macro")
        if log_view:
            #print("total correct/ num_samples:", total_correct, len(y_val))
            print("Val Loss: {l}, Val F1 {f}".format(l=val_loss.item(),f=f1))
    return val_loss.item(), f1

def test_model(model, log_view = False):
    criterion = nn.CrossEntropyLoss()
    with torch.no_grad():
        testX_ = testX.reshape(len(testX),28*28)
        inputs = torch.FloatTensor(testX_).to(device)
        outputs = model(inputs) 
        labels = torch.tensor(testY,dtype=torch.int64).to(device)
        val_loss = criterion(outputs, labels)
        _, predicted = torch.max(outputs, 1)
        total_correct = (predicted == labels).sum().item()
        f1 = f1_score(labels,predicted,average="macro")
        if log_view:
            #print("total correct/ num_samples:", total_correct, len(y_val))
            print("Val Loss: {l}, Val F1 {f}".format(l=val_loss.item(),f=f1))
    return val_loss.item(), f1

## Genetic Algorithm

In [4]:
def gen2param(gen):
    gen_p1 = gen[0:10]
    gen_p2 = gen[10:13]
    b = 0
    for i in range(10):
        b += gen_p1[i]*2**(9-i)
    if b<16:
        b = 16
        gen[5] = 1
        gen[6:10] = 0
    if gen_p2[0] == 1:
        activation = "relu"
    elif gen_p2[1] == 1:
        activation = "sigmoid"
    elif gen_p2[2] == 1:
        activation = "tahn"
    return int(b), activation

def cal_fitness(gen):
    b, act = gen2param(gen)
    model_gen = MLP2(activation=act).to(device)
    l,f = train_model(model_gen, b)
    return f

def crossover(parent1, parent2):
    pos = np.random.randint(1,10)
    child1 = np.zeros(13)
    child2 = np.zeros(13)
    child1[0:pos] = parent1[0:pos].copy()
    child1[pos+1:13] = parent2[pos+1:13].copy()
    child2[0:pos] = parent2[0:pos].copy()
    child2[pos+1:13] = parent1[pos+1:13].copy()
    return child1, child2

def mutation(parent1, parent2):
    pos1 = np.random.randint(0,10) 
    pos2 = np.random.randint(0,10)
    child1 = parent1.copy()
    child2 = parent2.copy()
    if child1[pos1] == 0:
        child1[pos1] = 1
    else:
        child1[pos1] = 0
    if child2[pos1] == 0:
        child2[pos1] = 1
    else:
        child2[pos1] = 0
    return child1, child2

class Population():
    def __init__(self):
        g1 = np.array([1,1,0,0,0,0,0,0,0,0,1,0,0])
        g2 = np.array([0,0,1,1,0,0,0,0,0,0,1,0,0])
        g3 = np.array([0,0,0,0,1,1,0,0,0,0,1,0,0])
        g4 = np.array([0,0,0,0,1,1,0,0,0,0,0,1,0])
        g5 = np.array([0,0,0,0,1,1,0,0,0,0,0,0,1])
        f1 = cal_fitness(g1)
        f2 = cal_fitness(g2)
        f3 = cal_fitness(g3)
        f4 = cal_fitness(g4)
        f5 = cal_fitness(g5)
        population = [[g1,1,f1], [g2,1,f2],[g3,1,f3],
                      [g4,1,f4],[g5,1,f5]]
        self.fit_hist = [(f1+f2+f3+f4+f5)/5]
        self.fit_best = [max(f1,f2,f3,f4,f5)]
        self.population = population
        
    def Roulette(self):
        pop_num = len(self.population)
        fitness = [self.population[i][2] for i in range(pop_num)]
        prob = softmax(fitness)
        idx = np.random.choice([i for i in range(pop_num)],2,replace=False,p=prob)
        idx1, idx2 = idx[0], idx[1]
        self.population[idx1][1] += 1
        self.population[idx2][1] += 1
        return self.population[idx1][0], self.population[idx2][0]
    
    def add_child(self, ch1, ch2):
        f1 = cal_fitness(ch1)
        f2 = cal_fitness(ch2)
        self.population.append([ch1,1,f1])
        self.population.append([ch2,1,f2])
        pop_num = len(self.population)
        fitness = [self.population[i][2] for i in range(pop_num)]
        fitness_new = np.mean(fitness)
        fitness_best = np.max(fitness)
        self.fit_hist.append(fitness_new)
        self.fit_best.append(fitness_best)
        np.save("./fitness.npy", self.fit_hist)
        np.save("./best_hist.npy", self.fit_best)
        return fitness_new
        
    def del_parent(self):
        i = 0
        while i < len(self.population):
            if self.population[i][1] > 5:
                del self.population[i]
            else:
                i += 1
    
    def best_gene(self):
        pop_num = len(self.population)
        fitness = [self.population[i][2] for i in range(pop_num)]
        best_idx = fitness.index(max(fitness))
        best_gene = self.population[best_idx]
        np.save("./fitness.npy", self.fit_hist)
        return best_gene
        

def geneticAlg():
    # initialization
    pop = Population()
    for i in range(300):
        # Roulette
        pr1, pr2 = pop.Roulette()
        # crosssover
        if np.random.rand()>0.1:
            ch1, ch2 = crossover(pr1,pr2)
        # mutation
        else:
            ch1, ch2 = mutation(pr1,pr2)
        # calculate fitness
        fit_i = pop.add_child(ch1, ch2)
        pop.del_parent()
        print(i,fit_i)
    best_gen = pop.best_gene()
    return best_gen

In [5]:
best_gen = geneticAlg()

0 0.7455535443357276
1 0.7807808639155283
2 0.7830926040089342
3 0.7938524733765722
4 0.7985307798070495
5 0.7589750076158356
6 0.7737703953537597
7 0.7852639044667687
8 0.7929053840438418
9 0.7866772058410478
10 0.7880471370206088
11 0.7931909659638579
12 0.7993889860184556
13 0.8028495956393567
14 0.8056804000816353
15 0.8101874499366113
16 0.8141067602198812
17 0.8181779710151348
18 0.8176405753884567
19 0.8172323494602965
20 0.8207933515927662
21 0.8213349988062223
22 0.8233183596703065
23 0.8230366067616135
24 0.8250719240379517
25 0.8273612717010844
26 0.8257049058364464
27 0.8282058294726782
28 0.8260622141948292
29 0.8264023999803298
30 0.8174703356164874
31 0.8194338166136981
32 0.8212513284346865
33 0.8228075009284775
34 0.8222610121351039
35 0.8244341866904861
36 0.8258335895257858
37 0.8269307120947573
38 0.8289983184422751
39 0.8285651597977416
40 0.8302102468714903
41 0.8313573861659258
42 0.8328605549343998
43 0.8344317616240396
44 0.8360608695103672
45 0.836855488332420

In [6]:
print(best_gen)

[array([0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 1., 0., 0.]), 1, 0.9356107917861086]


The best combination of hyperparameters are: b = 16, activation function is ReLU.

In [None]:
model_best = MLP2(activation="relu").to(device)
l,f = train_model(model_best, b=16, log_all=True)

In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(8,6))
plt.plot(f, label="F_1 Score")
plt.xlabel('Episodes')
plt.ylabel('Training F_1')
plt.title('Training F_1 Score V.S. Generations')
plt.legend()


## Bayesian Optimization

In [4]:
# Define the black box function for Bayesian Optimization
def black_box_func(b,act):
    if act == 0:
        model = MLP2(activation="relu")
    elif act == 1:
        model = MLP2(activation="sigmoid")
    elif act == 2:
        model = MLP2(activation="tanh")
    l, f = train_model(model, b)
    return f

def f_opt(b,act):
    act = int(act)
    b = int(b)
    f = black_box_func(b,act)
    return f

In [None]:
 from bayes_opt import BayesianOptimization
optimizer = BayesianOptimization(f=f_opt,
                                 pbounds={'b':(16,1024),'act':(0,2.99)},
                                 verbose=2,
                                 random_state=1,
                                 allow_duplicate_points=True)

optimizer.set_gp_params(alpha=1e-3)
optimizer.maximize(n_iter=300) 

The best combination of hyperparameters are: b=17, activation function is tanh.

In [None]:
model_best = MLP2(activation="tahn").to(device)
l,f = train_model(model_best, b=17, log_all=True)

In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(8,6))
plt.plot(f, label="F_1 Score")
plt.xlabel('Episodes')
plt.ylabel('Training F_1')
plt.title('Training F_1 Score V.S. Generations')
plt.legend()