In [10]:
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image
import torchvision.transforms as transforms

transform = transforms.ToTensor()

In [11]:
mbti_df = pd.read_csv('crop128/all.csv')

photo_df = pd.DataFrame(columns=['photo'], dtype= 'object')

# put the photo data into a dataframe
for i in range(len(mbti_df)):
    image = transform(Image.open('crop128/' + mbti_df['filename'][i]))
    photo_df.loc[i] = [image]

mbti_df = mbti_df['MBTI']

mbti_df.head()

0    enfj
1    enfj
2    enfj
3    enfj
4    enfj
Name: MBTI, dtype: object

In [12]:
photo_df.iloc[0][0].shape

torch.Size([1, 64, 64])

In [13]:
alphabet = ['t', 'f']

# if mbti_df['MBTI'] includes alphabet[0], then mbti_df['MBTI'] = 1, else 0
mbti_df = mbti_df.apply(lambda x: 1 if alphabet[0] in x else 0)

mbti_df.head()

0    0
1    0
2    0
3    0
4    0
Name: MBTI, dtype: int64

In [14]:
#print the statistics of the mbti_df
print(mbti_df.value_counts())

1    1551
0    1493
Name: MBTI, dtype: int64


In [15]:
# train_data includes the element of 2-dim tensor
train_data = photo_df['photo'].values
train_label = mbti_df.values

In [16]:
#전체 data 중 train의 비율
train_ratio = 0.8

train_idx = np.random.choice(len(train_data), int(len(train_data) * train_ratio), replace=False)
test_idx = np.array(list(set(range(len(train_data))) - set(train_idx)))

test_data = train_data[test_idx]
test_label = train_label[test_idx]

train_data = train_data[train_idx]
train_label = train_label[train_idx]

In [17]:
#defining model
import torch.nn as nn
import torch.nn.functional as F
from sklearn.preprocessing import StandardScaler
from torch.utils.data import Dataset, DataLoader

class MBTI_Dataset(Dataset):
    def __init__(self, train_label, train_data):
        self.train_label = train_label
        self.train_data = train_data

    def __len__(self):
        return len(self.train_label)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        mbti = self.train_label[idx]
        photo = self.train_data[idx]

        return mbti, photo

# parameter 값은 이것을 변경해주세요
in_ch1 = 1
out_ch1 = 32
ker1 = 2
stride1 = 1
pad1 = 0

out_ch2 = 64
ker2 = 2
stride2 = 1
pad2 = 0

out_ch3 = 128
ker3 = 2
stride3 = 1
pad3 = 0

out_ch4 = 128
ker4 = 2
stride4 = 1
pad4 = 0

out_ch5 = 64
ker5 = 2
stride5 = 1
pad5 = 0


pool_size1 = 2
pool_size2 = 2
pool_size3 = 2
pool_size4 = 2
pool_size5 = 2


out_feat1 = 120
out_feat2 = 84
out_feat3 = 1


class Net(nn.Module):
    def __init__(self, input_shape):
        super(Net, self).__init__()
        input_height, input_width = input_shape

        self.conv1 = nn.Conv2d(in_channels = in_ch1, out_channels = out_ch1, kernel_size = ker1, stride = stride1, padding = pad1)
        self.pool1 = nn.MaxPool2d(pool_size1, pool_size1)

        output1_height, output1_width = (input_height - ker1 + 2 * pad1) / stride1 + 1, (input_width - ker1 + 2 * pad1) / stride1 + 1
        output1_height, output1_width = int(output1_height / pool_size1), int(output1_width / pool_size1)

        self.conv2 = nn.Conv2d(in_channels = out_ch1, out_channels = out_ch2, kernel_size = ker2, stride = stride2, padding = pad2)
        self.pool2 = nn.MaxPool2d(pool_size2, pool_size2)

        output2_height, output2_width = (output1_height - ker2 + 2 * pad2) / stride2 + 1, (output1_width - ker2 + 2 * pad2) / stride2 + 1
        output2_height, output2_width = int(output2_height / pool_size2), int(output2_width / pool_size2)

        self.conv3 = nn.Conv2d(in_channels = out_ch2, out_channels = out_ch3, kernel_size = ker3, stride = stride3, padding = pad3)
        self.pool3 = nn.MaxPool2d(pool_size3, pool_size3)

        output3_height, output3_width = (output2_height - ker3 + 2 * pad3) / stride3 + 1, (output2_width - ker3 + 2 * pad3) / stride3 + 1
        output3_height, output3_width = int(output3_height / pool_size3), int(output3_width / pool_size3)

        self.conv4 = nn.Conv2d(in_channels = out_ch3, out_channels = out_ch4, kernel_size = ker4, stride = stride4, padding = pad4)
        self.pool4 = nn.MaxPool2d(pool_size4, pool_size4)

        output4_height, output4_width = (output3_height - ker4 + 2 * pad4) / stride4 + 1, (output3_width - ker4 + 2 * pad4) / stride4 + 1
        output4_height, output4_width = int(output4_height / pool_size4), int(output4_width / pool_size4)

        self.conv5 = nn.Conv2d(in_channels = out_ch4, out_channels = out_ch5, kernel_size = ker5, stride = stride5, padding = pad5)
        self.pool5 = nn.MaxPool2d(pool_size5, pool_size5)

        output5_height, output5_width = (output4_height - ker5 + 2 * pad5) / stride5 + 1, (output4_width - ker5 + 2 * pad5) / stride5 + 1
        output5_height, output5_width = int(output5_height / pool_size5), int(output5_width / pool_size5)

        self.fc1 = nn.Linear(out_ch5 * output5_height * output5_width, out_feat1)
        self.fc2 = nn.Linear(out_feat1, out_feat2)
        self.fc3 = nn.Linear(out_feat2, out_feat3)
    
    def forward(self, x):
        x = self.pool1(F.relu(self.conv1(x)))
        #print('1')
        x = self.pool2(F.relu(self.conv2(x)))
        #print('2')
        x = self.pool3(F.relu(self.conv3(x)))
        #print('3')
        x = self.pool4(F.relu(self.conv4(x)))
        #print('4')
        x = self.pool5(F.relu(self.conv5(x)))
        #print('5')
        x = torch.flatten(x, 1)

        # 이 부분은 변경하셔도 괜찮아요. relu로 할지 sigmoid로 할지
        x = torch.sigmoid(self.fc1(x))
        x = torch.sigmoid(self.fc2(x))
        x = torch.sigmoid(self.fc3(x))
        return x

class cnn_model():
    def __init__(self, model, lr=0.01, epochs=100, momentum = 0.6):
        self.model = model
        self.lr = lr
        self.epochs = epochs
        self.momentum = momentum
        self.criterion = nn.BCEWithLogitsLoss()
        self.optimizer = torch.optim.SGD(self.model.parameters(), lr=self.lr, momentum = self.momentum)
    
    def fit(self, X_train, y_train):        
        self.trainloader = DataLoader(MBTI_Dataset(X_train, y_train), batch_size=64, shuffle=False)
        
        self.model.train()
        for epoch in range(self.epochs):
            for i, data in enumerate(self.trainloader):
                inputs, labels = data
                self.optimizer.zero_grad()
                outputs = self.model(inputs)
                labels.unsqueeze_(1)
                loss = self.criterion(outputs, labels.float())
                loss.backward()
                self.optimizer.step()
    
    def predict(self, x):
        self.model.eval()
        with torch.no_grad():
            y_pred = self.model(x.unsqueeze(0))
        return y_pred

    def get_params(self, deep=True):
        return {'lr': self.lr, 'epochs': self.epochs, 'momentum': self.momentum}

    def save(self, path):
        torch.save(self.model.state_dict(), path)

In [18]:
net = Net((train_data[0].shape[1], train_data[0].shape[2]))

In [19]:
from sklearn.model_selection import KFold
from sklearn.metrics import roc_auc_score
from sklearn.metrics import accuracy_score

def cross_val_score(model, train_data, label, cv=5):
    k = cv
    kf = KFold(n_splits=k, random_state=42, shuffle=True)

    acc_score = []
    auc_score = []
    
    for train_index , test_index in kf.split(train_data):
        X_train , X_test = train_data[train_index],train_data[test_index]
        y_train , y_test = label[train_index] , label[test_index]
        
        if(np.unique(y_test).shape[0] == 1):
            print('only one class')
            continue

        model.fit(X_train,y_train)

        pred_values = []

        for i in range(len(X_test)):
            pred = model.predict(X_test[i])
            pred_values.append(pred.item())

        auc = roc_auc_score(y_test, pred_values)
        auc_score.append(auc)
        
    avg_acc_score = sum(acc_score)/k
    avg_auc_score = sum(auc_score)/k
    
    return avg_auc_score

In [20]:
from bayes_opt import BayesianOptimization

In [21]:
# bayesian optimization for hyperparameter tuning
from bayes_opt import BayesianOptimization

def bayes_parameter_opt_lgb(X, y, init_round=15, opt_round=25, n_folds=5, random_seed=6, n_estimators=10000, learning_rate=0.05, output_process=False):
    # prepare data
    train_data = X
    #train_data = lgb.Dataset(data=X, label=y, categorical_feature = cat_features, free_raw_data=False)
    # define your metric
    def lgb_eval(num_leaves, feature_fraction, bagging_fraction, max_depth, lambda_l1, lambda_l2, min_split_gain, min_child_weight):
        params = {'application':'binary', 'num_iterations': n_estimators, 'learning_rate':learning_rate, 'early_stopping_round':100, 'metric':'auc'}
        params["num_leaves"] = int(round(num_leaves))
        params['feature_fraction'] = max(min(feature_fraction, 1), 0)
        params['bagging_fraction'] = max(min(bagging_fraction, 1), 0)
        params['max_depth'] = int(round(max_depth))
        params['lambda_l1'] = max(lambda_l1, 0)
        params['lambda_l2'] = max(lambda_l2, 0)
        params['min_split_gain'] = min_split_gain
        params['min_child_weight'] = min_child_weight
        cv_result = lgb.cv(params, train_data, nfold=n_folds, seed=random_seed, stratified=True, verbose_eval =200, metrics=['auc'])
        return max(cv_result['auc-mean'])
    # range 
    lgbBO = BayesianOptimization(lgb_eval, {'num_leaves': (24, 45),
                                            'feature_fraction': (0.1, 0.9),
                                            'bagging_fraction': (0.8, 1),
                                            'max_depth': (5, 8.99),
                                            'lambda_l1': (0, 5),
                                            'lambda_l2': (0, 3),
                                            'min_split_gain': (0.001, 0.1),
                                            'min_child_weight': (5, 50)}, random_state=0)
    # optimize
    lgbBO.maximize(init_points=init_round, n_iter=opt_round)
    # output optimization process
    if output_process==True: lgbBO.points_to_csv("bayes_opt_result.csv")
    # return best parameters
    return lgbBO.max



In [22]:
# make tupes of (lr, momentum, epochs) randomly
# random cv를 몇번 돌릴 것인지...
random_cv_num = 30


# parameter 값이 이 범위 내에서 나옵니다
lrs = np.linspace(0.01, 0.06, 30)
momentums = np.linspace(0.0, 0.9, 20)
epochss = np.linspace(50, 200, 5, dtype=int)

params = [(lr, momentum, epochs) for lr in lrs for momentum in momentums for epochs in epochss]
np.random.shuffle(params)
params = params[:random_cv_num]

In [24]:
#import roc curve
from sklearn.metrics import roc_curve

models = [cnn_model(net, lr, epochs, momentum) for lr, momentum, epochs in params]

overall_best_threshold = 0
overall_best_accuracy = 0
overall_best_improved_accuracy = 0
overall_best_auc = 0
overall_best_mean = 0
overall_best_var = 0
overall_best_params = None

best_model = None

for model in models:
    model.fit(train_data, train_label)

    train_pred_values = []

    for i in range(len(train_data)):
        pred = model.predict(train_data[i])
        train_pred_values.append(pred.item())

    #calculate mean and variance of train_pred
    train_pred_values = np.array(train_pred_values)
    train_pred_mean = np.mean(train_pred_values)
    train_pred_var = np.var(train_pred_values)

    #normalize train_pred_values
    train_pred_values = (train_pred_values - train_pred_mean) / np.sqrt(train_pred_var)

    best_threshold = 0
    best_score = 0

    for threshold in np.arange(-1, 1, 0.0001):
        y_pred = np.array(train_pred_values) > threshold
        score = accuracy_score(train_label, y_pred)
        if score > best_score:
            best_threshold = threshold
            best_score = score

    pred_values = []

    for i in range(len(test_data)):
        pred = model.predict(test_data[i])
        pred_values.append(pred.item())

    #quantize predictions
    pred_values = np.array(pred_values)

    #normalize pred_values
    pred_values = (pred_values - train_pred_mean) / np.sqrt(train_pred_var)
    guess3 = roc_auc_score(test_label, pred_values)

    pred_values[pred_values >= best_threshold] = 1
    pred_values[pred_values < best_threshold] = 0

    #calculate accuracy score

    guess1 = accuracy_score(test_label, pred_values)

    # random guess

    pred_values = []

    for i in range(len(test_data)):
        pred = 0
        pred_values.append(pred)

    #quantize predictions
    pred_values = np.array(pred_values)

    #calculate accuracy score
    guess2_1 = accuracy_score(test_label, pred_values)
    
    pred_values = []

    for i in range(len(test_data)):
        pred = 1
        pred_values.append(pred)
    
    #quantize predictions
    pred_values = np.array(pred_values)

    #calculate accuracy score
    guess2_2 = accuracy_score(test_label, pred_values)

    guess2 = max(guess2_1, guess2_2)

    improved_accuracy = (guess1 - guess2) * 100

    print('improved_accuracy:', improved_accuracy)
    print('learning rate:', model.get_params()['lr'], 'momentum:', model.get_params()['momentum'], 'epochs:', model.get_params()['epochs'])

    if(overall_best_improved_accuracy < improved_accuracy):
        overall_best_threshold = best_threshold
        overall_best_accuracy = guess1
        overall_best_improved_accuracy = improved_accuracy
        overall_best_auc = guess3
        overall_best_mean = train_pred_mean
        overall_best_var = train_pred_var
        overall_best_params = model.get_params()
        best_model = model

        print('-------improved!--------')
        print('best threshold: ', best_threshold)
        print('best accuracy: ', guess1)
        print('best improved accuracy: ', improved_accuracy)
        print('best auc: ', guess3)
        print('best mean: ', train_pred_mean)
        print('best var: ', train_pred_var)
        print('best learning rate: ', overall_best_params['lr'], 'best momentum: ', overall_best_params['momentum'], 'best epochs: ', overall_best_params['epochs'])
        print('------------------------')

if(overall_best_params == None):
    print('test failed!')
else:
    print('-------------final result-------------')
    print('best threshold: ', overall_best_accuracy)
    print('best accuracy: ', overall_best_accuracy)
    print('best improved accuracy: ', overall_best_improved_accuracy)
    print('best auc: ', overall_best_auc)   
    print('best_mean: ', overall_best_mean)
    print('best_var: ', overall_best_var)
    print('best learning rate: ', overall_best_params['lr'], 'best momentum: ', overall_best_params['momentum'], 'best epochs: ', overall_best_params['epochs'])
    best_model.save('cnn_sn.pth')

improved_accuracy: -2.9556650246305436
learning rate: 0.041034482758620684 momentum: 0.8526315789473684 epochs: 50
improved_accuracy: -0.3284072249589487
learning rate: 0.0496551724137931 momentum: 0.8526315789473684 epochs: 87
improved_accuracy: -0.3284072249589487
learning rate: 0.041034482758620684 momentum: 0.09473684210526316 epochs: 162
improved_accuracy: -1.1494252873563204
learning rate: 0.027241379310344822 momentum: 0.09473684210526316 epochs: 200
improved_accuracy: -0.6568144499178974
learning rate: 0.027241379310344822 momentum: 0.14210526315789473 epochs: 87
improved_accuracy: -0.9852216748768461
learning rate: 0.030689655172413788 momentum: 0.2368421052631579 epochs: 50
improved_accuracy: -0.8210180623973717
learning rate: 0.011724137931034483 momentum: 0.3315789473684211 epochs: 50
improved_accuracy: -0.6568144499178974
learning rate: 0.03586206896551724 momentum: 0.9 epochs: 50
improved_accuracy: -0.3284072249589487
learning rate: 0.032413793103448274 momentum: 0.473684

In [None]:
# save best parameters in pickle file
import pickle

with open('best_params.pickle', 'wb') as f:
    pickle.dump(overall_best_params, f, pickle.HIGHEST_PROTOCOL)

