# 1. 모듈 불러오기

In [1]:
import numpy as np
from scipy.sparse import csr_matrix
from scipy.sparse import issparse
from sklearn import datasets
from sklearn.decomposition import PCA
from sklearn.preprocessing import MultiLabelBinarizer
import random
import math
from numpy import linalg as la

# 2. 데이터 불러오기

In [16]:
files = ['data/scene_train', 'data/scene_test']

data = datasets.load_svmlight_files(files, multilabel=True)
train_data = data[0]
train_target = np.array(MultiLabelBinarizer().fit_transform(data[1]))
test_data = data[2]
test_target = data[3]

# 3. 클래스 생성하기 - 학습, 테스트

In [17]:
class BPMLL:
    
    def __init__(self, neural=0.2, epoch=20, weight_decay=0.00001, regularization=0.1, print_procedure=False):
        self.features = 0
        self.classes = 0
        self.samples = 0
        self.neural_num = 0
        self.learn_rate = 0.05
        self.neural_percent = neural
        self.epoch = epoch
        self.weightsDecayCost = weight_decay
        self.regularization = regularization
        self.error_small_change = 0.00001
        self.final_error = 0
        self.dataset = []
        self.threshold = None
        self.wsj_matrix = []
        self.vhs_matrix = []
        self.bias_b = []
        self.bias_a = []
        self.print_procedure = print_procedure
        self.trained = False

    def fit(self, x, y):
        self.features = x.shape[1]  
        self.classes = y.shape[1]  
        self.dataset = self.prepare_data(x, y)
        self.samples = len(self.dataset)
        self.neural_num = int(self.features * self.neural_percent)   
        self.wsj_matrix = np.random.random_sample((self.neural_num, self.classes)) - 0.5  
        self.vhs_matrix = np.random.random_sample((self.features, self.neural_num)) - 0.5  
        self.bias_b = np.ones((1, self.classes))  
        self.bias_a = np.ones((1, self.neural_num))  
        self.iterate_training()
        self.trained = True
        return self

    def prepare_data(self, x, y):
        dataset = [] 
        for i in range(x.shape[0]):
            dataset.append(TrainPair(x[i], y[i]))
        return dataset

    def iterate_training(self):
        prev_error = self.global_error()
        for ep in range(self.epoch):
            if self.print_procedure:
                print("학습된 데이터의 epoch는 " + str(ep))
            for i in range(self.samples):
                self.fit_once(i)
            error = self.global_error()
            diff = prev_error - error
            if diff <= self.error_small_change * prev_error:
                self.build_threshhold()
                self.final_error = error
                return
            prev_error = error

        self.build_threshhold()
        self.final_error = prev_error
        return

    def fit_once(self, index):
        x = self.dataset[index].attributes
        y = self.dataset[index].labels
        x_vec = np.array([x]).T
        is_label = self.dataset[index].isLabel
        not_label = self.dataset[index].notLabel
        is_label_length = len(is_label)
        not_label_length = len(not_label)
        b, c = self.forward_propagation(x)
        exp_func = math.exp
        dj_sigma = np.zeros((1, self.classes))
        for j in range(self.classes):
            tmp = 0
            if y[j] == 1:
                for l in not_label:
                    tmp += exp_func(-(c[0, j] - c[0, l]))
            else:
                for k in is_label:
                    tmp -= exp_func(-(c[0, k] - c[0, j]))
            dj_sigma[0, j] = tmp
        
        d = (1 / (is_label_length * not_label_length)) * dj_sigma * (1 - np.square(c))
        b_vec = b.T
        d_vec = d.T
        es_sigma_vec = np.dot(self.wsj_matrix, d_vec)
        e_vec = es_sigma_vec * (1 - np.square(b_vec))
        e = e_vec.T
        self.wsj_matrix = (1 - self.weightsDecayCost) * self.wsj_matrix + self.learn_rate * np.dot(b_vec, d)
        self.vhs_matrix = (1 - self.weightsDecayCost) * self.vhs_matrix + self.learn_rate * np.dot(x_vec, e)
        self.bias_b = (1 - self.weightsDecayCost) * self.bias_b + self.learn_rate * d
        self.bias_a = (1 - self.weightsDecayCost) * self.bias_a + self.learn_rate * e

        return

    def forward_propagation(self, x):
        x = np.array([x])
        netb = np.dot(x, self.vhs_matrix) + self.bias_a
        b = tanh(netb)   
        netc = np.dot(b, self.wsj_matrix) + self.bias_b
        c = tanh(netc)
        return b, c

    def global_error(self):
        global_error = 0
        weights_square_sum = np.sum(np.square(self.wsj_matrix)) + np.sum(
                np.square(self.vhs_matrix)) + np.sum(np.square(self.bias_b)) + np.sum(np.square(self.bias_a))
        for i in range(self.samples):
            c = self.forward_propagation(self.dataset[i].attributes)[1]
            yi = self.dataset[i].isLabel
            nyi = self.dataset[i].notLabel
            yi_length = len(yi)
            nyi_length = len(nyi)
            A = np.array([[c[0, l] - c[0, k] for k in yi] for l in nyi])
            global_error += 1 / (yi_length * nyi_length) * np.sum(np.exp(A))
        global_error += self.regularization * weights_square_sum
        return global_error

    def build_threshhold(self):
        model_outputs = []
        ideal_labels = []
        for i in range(self.samples):
            c = self.forward_propagation(self.dataset[i].attributes)[1][0] 
            print(c)
            model_outputs.append(c)
            ideal_labels.append(self.dataset[i].labels)
        self.threshold = ThresholdFunction(model_outputs, ideal_labels)

    def predict(self, x, rank_results=False):
        
        samples, features = x.shape
        result = RankResults()
        for sample_index in range(samples):
            sample_result = []
            c = self.forward_propagation(x[sample_index])[1][0]
            print("ghihi")
            print(c)
            threshold = self.threshold.compute_threshold(c)
            print(threshold)
            top_label = None
            max_value = 0
            count = 0
            for j in range(self.classes):
                if c[j] >= threshold:
                    count += 1
                    sample_result.append(j)
                if c[j] > max_value:
                    top_label = j
                    max_value = c[j]
            if count == 0:
                sample_result.append(top_label)
            result.add(sample_result, top_label, c)
        if rank_results is False:
            result = result.predictedLabels

        return result

In [18]:
class TrainPair:
    def __init__(self, attributes, labels):
        self.attributes = attributes 
        self.labels = labels 
        self.isLabel = [] 
        self.notLabel = [] 
        for j in range(labels.shape[0]):
            if labels[j] == 1:
                self.isLabel.append(j)
            else:
                self.notLabel.append(j)

In [19]:
class ThresholdFunction:

    def __init__(self, model_output, ideal_labels):
        self.parameters = []
        self.build(model_output, ideal_labels)

    def build(self, model_output, ideal_labels):
        samples = len(ideal_labels)
        labels = len(ideal_labels[0]) 
        threshholds = np.zeros(samples) 

        for sample_index in range(samples): 
            label_value = [float('inf') for i in range(labels)]
            notlabel_value = [float('-inf') for i in range(labels)] 
            for j in range(labels):
                if ideal_labels[sample_index][j] == 1:  
                    label_value[j] = model_output[sample_index][j]   
                else:
                    notlabel_value[j] = model_output[sample_index][j]
       
            label_min = min(label_value)
            notlabel_max = max(notlabel_value)

            if label_min != notlabel_max:
                if label_min == float('inf'):
                    threshholds[sample_index] = notlabel_max + 0.1
                elif notlabel_max == float('-inf'):
                    threshholds[sample_index] = label_min - 0.1
                else:
                    threshholds[sample_index] = (label_min + notlabel_max) / 2
            else:
                threshholds[sample_index] = label_min
        
        model_output = np.concatenate((model_output, np.array([np.ones(samples)]).T), axis=1)
        self.parameters = np.linalg.lstsq(model_output, threshholds)[0]

    def compute_threshold(self, outputs):
        parameter_length = len(self.parameters)
        b_index = parameter_length - 1

        threshold = 0
        for i in range(b_index):
            threshold += outputs[i] * self.parameters[i]
        threshold += self.parameters[b_index]
        #print(threshold)
        return threshold

In [20]:
def tanh(x):
    return 2 / (1 + np.exp(-2 * x)) - 1

In [21]:
class RankResults:
    def __init__(self):
        self.predictedLabels = []
        self.topRankedLabels = []
        self.outputs = []

    def add(self, predict_set, top_label, output):
        self.predictedLabels.append(predict_set)
        self.topRankedLabels.append(top_label)
        self.outputs.append(output)

# 5. 벡터화 및 실시

In [22]:
result = BPMLL(print_procedure=True, neural=0.4, regularization=0, epoch=40000).fit(train_data, train_target).predict(train_data, rank_results=True)

ValueError: shapes (1,) and (294,117) not aligned: 1 (dim 0) != 294 (dim 0)

In [9]:
class Aggregate:
    @staticmethod
    def intersection(a, b):
        inter = 0
        for i in a:
            if i in b:
                inter += 1
        return inter

    @staticmethod
    def sum(a, b):
        return len(a) + len(b) - Aggregate.intersection(a, b)

    @staticmethod
    def sym_difference(a, b):
        return len(a) + len(b) - 2 * Aggregate.intersection(a, b)

In [10]:
class UniversalMetrics:
    def __init__(self, expected, predicted):
        self.sampleNum = len(expected)
        self.expectedLabels = [[int(i) for i in expected[j]] for j in range(len(expected))]
        # fix for divide by zero problems, this will not affect the final result
        for predict_index in range(len(predicted)):
            if len(predicted[predict_index]) == 0:
                predicted[predict_index].append(None)
        self.predictedLabels = predicted

    def accuracy(self):
        result = 0
        for index in range(self.sampleNum):
            expected = self.expectedLabels[index]
            predicted = self.predictedLabels[index]
            result += Aggregate.intersection(expected, predicted) / Aggregate.sum(expected, predicted)
        return result / self.sampleNum

    def precision(self):
        result = 0
        for index in range(self.sampleNum):
            expected = self.expectedLabels[index]
            predicted = self.predictedLabels[index]
            result += Aggregate.intersection(expected, predicted) / len(predicted)
        return result / self.sampleNum

In [11]:
class RankResults:
    def __init__(self):
        self.predictedLabels = []
        self.topRankedLabels = []
        self.outputs = []

    def add(self, predict_set, top_label, output):
        self.predictedLabels.append(predict_set)
        self.topRankedLabels.append(top_label)
        self.outputs.append(output)

In [12]:
class RankMetrics(UniversalMetrics):
    """ Metrics design for ranking systems"""

    def __init__(self, expected, result):
        self.sampleNum = len(expected)
        expectedLabels = [[int(i) for i in expected[j]] for j in range(len(expected))]

        super().__init__(expectedLabels, result.predictedLabels)

        self.topRankedLabels = result.topRankedLabels
        self.outputs = result.outputs
        self.possibleLabelNum = len(self.outputs[0])

        self.ap_prepared = False
        self.ap = None
        self.rl_prepared = False
        self.rl = None

    def hamming_loss(self):
        diff_sum = 0
        for i in range(self.sampleNum):
            labels_sum = len(self.expectedLabels[i])
            intersection = 0
            for label in self.predictedLabels[i]:
                if label in self.expectedLabels[i]:
                    intersection += 1
            diff_sum += labels_sum - intersection

        return diff_sum / (self.possibleLabelNum * self.sampleNum)

    def one_error(self):
        error_sum = 0
        for i in range(self.sampleNum):
            if self.topRankedLabels[i] not in self.expectedLabels[i]:
                error_sum += 1

        return error_sum / self.sampleNum

    def coverage(self):
        cover_sum = 0
        for i in range(self.sampleNum):
            label_outputs = []
            for label in self.expectedLabels[i]:
                label_outputs.append(self.outputs[i][label])
            min_output = min(label_outputs)
            for j in range(self.possibleLabelNum):
                if self.outputs[i][j] >= min_output:
                    cover_sum += 1

        return (cover_sum / self.sampleNum) - 1

    def ranking_loss(self):
        if self.rl_prepared is True:
            return self.rl

        rloss_sum = 0
        ap_sum = 0
        for sample_index in range(self.sampleNum):
            unodered_part = []
            expected_num = len(self.expectedLabels[sample_index])

            sample_output = self.outputs[sample_index]
            output_dic = {}
            for output_index in range(self.possibleLabelNum):
                output_dic[output_index] = sample_output[output_index]

            sorted_output = sorted(output_dic.items(), key=operator.itemgetter(1), reverse=True)

            temp_count = 0
            times = 0
            for sorted_tuples in sorted_output:
                if times == expected_num:
                    break

                if sorted_tuples[0] not in self.expectedLabels[sample_index]:
                    temp_count += 1
                else:
                    unodered_part.append(temp_count)
                    temp_count = 0
                    times += 1
            if len(unodered_part) != expected_num:
                raise Exception("function error for RankingLoss")

            pairs_num = 0
            fraction_sum = 0
            fraction_divide = 0
            for cal_index in range(expected_num):
                pairs_num += unodered_part[cal_index] * (expected_num - cal_index)
                # prepare for calculating average precision
                fraction_divide += unodered_part[cal_index] + 1
                fraction_sum += (cal_index + 1) / fraction_divide

            rloss_sum += pairs_num / (expected_num * (self.possibleLabelNum - expected_num))
            ap_sum += fraction_sum / expected_num

        self.ap = ap_sum / self.sampleNum
        self.rl = rloss_sum / self.sampleNum
        self.ap_prepared = True
        self.rl_prepared = True

        return self.rl

    def average_precision(self):
        # contained in the ranking_loss function to save running time
        if self.ap_prepared is True:
            return self.ap
        else:
            self.ranking_loss()
            return self.ap


In [13]:
"""
    special metric for rank systems like BPMLL and RankingSVM, you can use RankMetrics
    when you specify rank_results=True in predict functions
"""
metric = RankMetrics(test_target, result)
print('hamming loss:' + str(metric.hamming_loss()))
print('one error:' + str(metric.one_error()))
print('coverage:' + str(metric.coverage()))
print('ranking_loss:' + str(metric.ranking_loss()))
print('average_precision:' + str(metric.average_precision()))
print('precision:' + str(metric.precision()))
print('accuracy:' + str(metric.accuracy()))

NameError: name 'test_target' is not defined

In [14]:
# feature extraction using PCA
feature_size = train_data.shape[1]
pca = PCA(n_components=(feature_size * 10) // 100)
train_data_trans = csr_matrix(pca.fit_transform(train_data.todense()))
test_data_trans = csr_matrix(pca.transform(test_data.todense()))

NameError: name 'train_data' is not defined