In [35]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, confusion_matrix, f1_score
import copy
import requests
import pickle
from io import BytesIO
from sklearn.preprocessing import StandardScaler

In [34]:
def data_preprocessing(data):
    # # check Z score
    # zscore = pd.DataFrame()
    # outlier = {}
    # outlier_idx = []
    # for col in data_scaled.columns:
    #     if col != data_scaled.columns[0]:
    #         zscore[f'{col}_z'] = sp.stats.zscore(data_scaled[col])
    #         outlier[col] = zscore[f'{col}_z'][(zscore[f'{col}_z']>2.58) | (zscore[f'{col}_z']<-2.58)] # 99% = 2.58
    #         outlier_idx.append(list(outlier[col].index))    
    # outlier_idx_sum = sum(outlier_idx,[])
    # data_scaled = data_scaled.drop(outlier_idx_sum)

    # IQR outlier + mean으로 대체 
    # class_label = data['Class Label']
    label_list = np.unique(data['Class Label'].to_numpy())
    processed_df = []
    # data_pre = data.loc[:, data.columns != 'Class Label'].copy()
    for class_num in label_list:
        data_per_class = data.loc[data['Class Label'] == class_num, : ].copy()
        data_pre = data_per_class.loc[:, data_per_class.columns != 'Class Label'].copy()
        data_class = data_per_class.loc[:, data_per_class.columns == 'Class Label'].copy()
        # data_pre = data.loc[:, data.columns != 'Class Label'].copy()
            
        for col in data_pre.columns:
            q1 = np.quantile(data_pre[f'{col}'], 0.25)
            q3 = np.quantile(data_pre[f'{col}'], 0.75)
            IQR = q3 - q1
            condition = (data_pre[f'{col}'] < (q1 - 1.5 * IQR)) | (data_pre[f'{col}'] > (q3 + 1.5 * IQR))
            outlier = data_pre[condition]
            data_pre[f'{col}'] = data_pre[f'{col}'].replace([data_pre[f'{col}'][outlier.index]], data_pre[f'{col}'].mean())

        data_trimmed = pd.concat([data_class, data_pre], axis=1)

        processed_df.append(data_trimmed)            
    

    data_pre = pd.concat(processed_df, axis=0)
    
    class_label = data_pre.loc[:, data_pre.columns == 'Class Label']
    data_pre = data_pre.drop(['Class Label'], axis=1)
    
    # regularization 표준화
    # regularization = StandardScaler()
    # regularization.fit(data_pre)
    # data_scaled = regularization.transform(data_pre)
    # data_scaled = pd.DataFrame(data=data_scaled, columns=data_pre.columns)
    # data_scaled[data_scaled.columns[0]] = label

    sLink = 'https://github.com/2U1/ML_RDACOV/blob/master/scaler.pkl?raw=True'
    sFile = BytesIO(requests.get(sLink).content)
    scaler = pickle.load(sFile)
    data_scaled = scaler.transform(data_pre)
    data_scaled = pd.DataFrame(data=data_scaled, columns=data_pre.columns)

    # # Normalization
    # normalization = MinMaxScaler()
    # normalization.fit(data_pre)
    # data_nor = normalization.transform(data_pre)
    # data_nor = pd.DataFrame(data=data_nor, columns=data_pre.columns)
    # data_nor[data_nor.columns[0]] = data_pre[data_pre.columns[0]]
    # data_nor[data_nor.columns[0]] = label
    

    data_processed = pd.concat([class_label, data_scaled], axis=1)

    return data_processed

In [39]:
def get_clf_eval(y_test, pred):
    accuracy = accuracy_score(y_test, pred)
    recall = recall_score(y_test, pred, average='macro')
    precision = precision_score(y_test, pred, average='macro')
    f1 = f1_score(y_test, pred, average='macro')
    print('Accuracy:{0:.4f}, Recall:{1:.4f}, Precision:{2:.4f}, F1-Score:{3:.4f}'.format(accuracy, recall, precision, f1))

In [40]:
def make_confusion(y_test, y_pred):
    confusion = confusion_matrix(y_test,y_pred)
    confusion_df = pd.DataFrame(confusion,columns=['Predicted_1','Predicted_2', 'Predicted_3'],index=['Predicted_1','Predicted_2', 'Predicted_3'])
    
    return confusion_df

In [41]:
def load_dataset(directory, preprocess = False):
    
    global input_cnt, output_cnt, data, x, y

    data = pd.read_csv(directory)
    
    if preprocess:
        data = data_preprocessing(data)

    input_cnt = data.loc[:, data.columns != 'Class Label'].shape[1]
    # output_cnt = data['Class Label'].shape[1]
    output_cnt = 1
    y = data['Class Label'].to_numpy()
    x = data.loc[:, data.columns != 'Class Label'].to_numpy()

In [42]:
class DiscriminentAnalysis():
    def __init__(self, alpha=0.0, beta=0.0, eval_mode = False):
        if not eval_mode:
            self.learned = False
            self.alpha = alpha
            self.beta = beta
            self.class_names = []
            self.class_priors = {}
            self.class_means = {}
            self.regularized_covariances = {}
            self.rda_covariances = {}
            self.feature_dimension = 0
            self.reset()

        else:
            self.load_parameter()


    def load_parameter(self):
        mLink = 'https://github.com/2U1/ML_RDACOV/blob/master/parameter.pkl?raw=true'
        mfile = BytesIO(requests.get(mLink).content)
        parameter = pickle.load(mfile)
        

        self.learned = True
        self.alpha = parameter['alpha']
        self.beta = parameter['beta']
        self.class_names = parameter['class_name']
        self.class_priors = parameter['class_priors']
        self.class_means = parameter['class_means']
        self.regularized_covariances = parameter['reg_cov']
        self.rda_covariances = parameter['rda_cov']


    def reset(self):
        self.learned = False
        self.class_names = []
        self.class_priors = {}
        self.class_means = {}
        self.regularized_covariances = {}
        self.rda_covariances = {}


    def return_parameters(self):
        parameters = {
            'alpha': self.alpha,
            'beta': self.beta,
            'class_name': self.class_names,
            'class_priors': self.class_priors,
            'class_means': self.class_means,
            'reg_cov': self.regularized_covariances,
            'rda_cov': self.rda_covariances
        }

        return parameters

    def fit(self, X, y):
        self.class_names = np.unique(y)
        class_covariances = {}
        pooled_covariances = 0
        self.feature_dimension = X.shape[1]
        for i in self.class_names:
            class_indices = np.where(y == i)[0]
            class_samples = X[class_indices, :]
            self.class_priors[i] = float(len(class_indices)) / len(y)
            self.class_means[i] = np.mean(class_samples, axis=0)
            class_covariances[i] = np.cov(class_samples, rowvar=0)
            pooled_covariances += class_covariances[i] * self.class_priors[i]
        # Calculate RDA regularized covariance matricies for each class
        for i in self.class_names:
            self.regularized_covariances[i] = (self.beta * pooled_covariances) + ((1 - self.beta) *class_covariances[i])
            # self.regularized_covariances[i] = (self.beta * class_covariances[i]) + ((1 - self.beta) * pooled_covariances)

        for i in self.class_names:
            self.rda_covariances[i] = ((1-self.alpha) * self.regularized_covariances[i]) + (self.alpha * (1/self.feature_dimension) * np.trace(self.regularized_covariances[i]) * np.eye(self.regularized_covariances[i].shape[0]))
        
        self.learned = True
        return self

    def predict(self, x):
        if not self.learned:
            raise NameError('Fit model first')
        # Determine probability of each class given input vector
        
        class_prob = {}
        for i in self.class_names:
            # Divid the class delta calculation into 3 parts
            part1 = -0.5 * np.log1p(np.linalg.det(self.rda_covariances[i]))
            # part2 = -0.5 * np.dot(np.dot((x - self.class_means[i]).T, np.linalg.pinv(self.rda_covariances[i])), (x - self.class_means[i]))
            part2 = -0.5 * np.matmul(np.matmul((x - self.class_means[i]).T, np.linalg.pinv(self.rda_covariances[i])), (x - self.class_means[i]))
            part3 = np.log(self.class_priors[i])
            class_prob[i] = part1 + part2 + part3
        return max(class_prob, key=class_prob.get)

In [43]:
class GridSearchRDA():
    def __init__(self, model, param_grid):
        self.model = model
        self.param_grid = param_grid
        self.alpha = 0
        self.beta = 0
        self.best_covariance = {}
        self.best_score = 0


    def fit(self, X, y, cv=2):
        
        # metric_score = []

        data_length = len(X)
        
        alpha_list = self.param_grid['alpha']
        beta_list = self.param_grid['beta']
        
        if data_length % cv == 0:
            cv_x = np.split(X, cv)
            cv_y = np.split(y, cv)
            
        else:
            remain = data_length % cv
            cv_x = np.split(X[:-remain], cv)
            cv_y = np.split(y[:-remain], cv)

        for alpha in alpha_list:
            for beta in beta_list:
                accuracy_score_list = []
                recall_score_list = []
                precision_score_list = []
                f1_score_list = []
                for i in range(cv):
                    self.model.reset()
                    self.model.alpha = alpha
                    self.model.beta = beta

                    test_x_cv = cv_x[i]
                    train_x_cv = np.vstack(cv_x[:i] + cv_x[i + 1:])

                    test_y_cv = cv_y[i]
                    train_y_cv = np.vstack(cv_y[:i] + cv_y[i + 1:]).flatten()

                    
                    self.model.fit(train_x_cv, train_y_cv)

                    pred = []

                    for data in test_x_cv:
                        pred.append(self.model.predict(data))
                    
                    accuracy_score_list.append(accuracy_score(test_y_cv, pred))
                    recall_score_list.append(recall_score(test_y_cv, pred, average='macro'))
                    precision_score_list.append(precision_score(test_y_cv, pred, average='macro'))
                    f1_score_list.append(f1_score(test_y_cv, pred, average='macro'))
                
                accuracy_mean_score = np.mean(np.array(accuracy_score_list))
                recall_mean_score = np.mean(np.array(recall_score_list))
                precision_mean_score = np.mean(np.array(precision_score_list))
                f1_mean_score = np.mean(np.array(f1_score_list))

                print("alpha:{0:.1f}, beta:{1:.1f}, accuracy:{2:.4f}, recall:{3:.4f}, precision:{4:.4f} ,f1-score:{5:.4f}"\
                    .format(alpha, beta, accuracy_mean_score ,recall_mean_score ,precision_mean_score ,f1_mean_score))

                if f1_mean_score > self.best_score:
                    self.best_score = f1_mean_score
                    self.alpha = alpha
                    self.beta = beta
                    self.best_covariance = copy.deepcopy(self.model.rda_covariances)
                    self.best_estimator = copy.deepcopy(self.model)

In [44]:
def test_evaluation(test_x, test_y):

    model = DiscriminentAnalysis(eval_mode=True)


    prediction = []
    
    for testing in test_x:
        prediction.append(model.predict(testing))


    confusion = make_confusion(test_y, prediction)
    
    get_clf_eval(test_y, prediction)
    print('\n\n')
    print(confusion)

In [36]:
load_dataset('./facial_expression_train_dataset.csv', preprocess=True)

In [74]:
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size = 0.2, stratify=y, random_state=104)

In [75]:
dis = DiscriminentAnalysis()

In [76]:
params = {
    'alpha': np.linspace(0.0, 1.0, num=11, endpoint=True),
    'beta': np.linspace(0.0, 1.0, num=11, endpoint=True)
}

In [77]:
grid = GridSearchRDA(dis, params)

In [78]:
grid.fit(x_train, y_train, cv=5)

alpha:0.0, beta:0.0, accuracy:0.8769, recall:0.8511, precision:0.8930 ,f1-score:0.8527
alpha:0.0, beta:0.1, accuracy:0.9385, recall:0.9422, precision:0.9422 ,f1-score:0.9302
alpha:0.0, beta:0.2, accuracy:0.9692, recall:0.9644, precision:0.9756 ,f1-score:0.9658
alpha:0.0, beta:0.3, accuracy:0.9846, recall:0.9778, precision:0.9889 ,f1-score:0.9806
alpha:0.0, beta:0.4, accuracy:0.9846, recall:0.9778, precision:0.9889 ,f1-score:0.9806
alpha:0.0, beta:0.5, accuracy:0.9846, recall:0.9778, precision:0.9889 ,f1-score:0.9806
alpha:0.0, beta:0.6, accuracy:0.9846, recall:0.9778, precision:0.9889 ,f1-score:0.9806
alpha:0.0, beta:0.7, accuracy:0.9846, recall:0.9778, precision:0.9889 ,f1-score:0.9806
alpha:0.0, beta:0.8, accuracy:0.9846, recall:0.9778, precision:0.9889 ,f1-score:0.9806
alpha:0.0, beta:0.9, accuracy:0.9846, recall:0.9778, precision:0.9889 ,f1-score:0.9806
alpha:0.0, beta:1.0, accuracy:0.9846, recall:0.9778, precision:0.9889 ,f1-score:0.9806
alpha:0.1, beta:0.0, accuracy:0.9385, recal

In [80]:
grid.beta

0.9

In [85]:
parameters = grid.best_estimator.return_parameters()
with open('../Covariance/parameter.pkl','wb') as f:
    pickle.dump(parameters, f)

In [82]:
pred = []
for testing in x_test:
    pred.append(grid.best_estimator.predict(testing))

In [83]:
f1_score(y_test, pred, average='macro')

1.0

In [84]:
test_evaluation(x_test, y_test)

Accuracy:1.0000, Recall:1.0000, Precision:1.0000, F1-Score:1.0000



             Predicted_1  Predicted_2  Predicted_3
Predicted_1            6            0            0
Predicted_2            0            6            0
Predicted_3            0            0            6
