In [None]:
from google.colab import drive
drive.mount('/content/gdrive')
import os
import re
import csv
import librosa
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.keras import optimizers,regularizers
from tensorflow.keras.models import Sequential, load_model, model_from_json 
from tensorflow.keras.layers import Conv2D,Dense,Activation,Flatten,Dropout,AveragePooling2D,BatchNormalization
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint
from tensorflow.python.framework.ops import disable_eager_execution
from sklearn.metrics import accuracy_score
from tensorflow.python.client import device_lib

#Generate training, test set
def walk(folder):
    '''Walk through every files in a directory'''
    for dirpath, dirs, files in os.walk(folder):
        for filename in files:
            yield dirpath, filename

def iscsv(ext):
    csvlist = ['.csv']
    if ext in csvlist:
        return True
    else:
        return False

def find_word(path):
    keywords=[]
    for folder, filename in walk(path):
        keyword = re.split('/',folder)[-1]
        if keyword not in keywords:
            keywords.append(keyword)
    return tuple(keywords)
  
def count_ext(path, ext):
    count = 0
    for folder, filename in walk(path):
        if filename[-len(ext):] == ext:
            count += 1
    return count

destpath = '/content/gdrive/MyDrive/수학캡스톤_공유폴더/data/rawdata/'
save_path = '/content/gdrive/MyDrive/수학캡스톤_공유폴더/eunu/result/'
try:
    os.mkdir(save_path)
except:
    pass
import keras
X_data = np.load(destpath+'X_10.npy')
Y_data = np.load(destpath+'Y_10.npy')
print('X_data shape:',np.shape(X_data))
print('Y_data shape:',np.shape(Y_data))
keywords = ('yes', 'no', 'up', 'down', 'left', 'right', 'on', 'off', 'stop','go')
'''
def split_dataset(Y,ratio=0.2):
    test_ix = np.array([])
    for i in np.unique(Y):
        idx = np.reshape(np.where(Y==i),-1)
        idx = np.random.choice(idx,int(len(idx)*ratio),False)
        test_ix = np.concatenate((test_ix,idx),axis=None)
    test_ix = np.sort(test_ix.astype(int))
    train_ix = list(set(range(len(temp)))-set(test_ix))
    return train_ix, test_ix
train_ix, test_ix = split_dataset(Y_data,0.1)
df = pd.DataFrame(test_ix)
df.to_csv(destpath+'testidx.csv',sep=',',header=False,
          float_format='%.2f',index=False)
'''
csvpath = destpath+'testidx.csv'
test_ix = pd.read_csv(csvpath,header=None)[0]
train_ix = list(set(range(len(Y_data)))-set(test_ix))
train_X = X_data[train_ix]
train_Y = Y_data[train_ix]
test_X = X_data[test_ix]
test_Y = Y_data[test_ix]

#shuffle data before training
tot_ix = range(len(train_X))
rand_ix = np.random.choice(tot_ix,len(train_X),False)
train_X = train_X[rand_ix]
train_Y = train_Y[rand_ix]

from keras.utils.np_utils import to_categorical
train_X = np.expand_dims(train_X,-1)
test_X = np.expand_dims(test_X,-1)
num_class = len(np.unique(Y_data))
train_Y = to_categorical(train_Y,num_class)
test_Y = to_categorical(test_Y,num_class)

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).
X_data shape: (23682, 40, 44)
Y_data shape: (23682,)


In [None]:
from tensorflow.keras.layers import Softmax, DepthwiseConv2D, LeakyReLU, GlobalAveragePooling2D
inp_shape = np.shape(train_X)[1:]

def build_model():
    model = Sequential()
    model.add(Conv2D(filters=70, kernel_size=(3,3), strides = 1, padding ='same', input_shape=inp_shape, activation = 'relu'))
    model.add(AveragePooling2D(pool_size=(3,3), strides=2))
    model.add(DepthwiseConv2D(kernel_size=(3,3),strides=(1,1),padding='same'))
    model.add(Conv2D(filters=50, kernel_size=(3,3), strides = 1, padding ='same', activation = 'relu'))
    model.add(BatchNormalization())
    model.add(DepthwiseConv2D(kernel_size=(3,3),strides=(1,1),padding='same'))
    model.add(Conv2D(filters=30, kernel_size=(3,3), strides = 1, padding ='same', activation = 'relu'))
    model.add(BatchNormalization())    
    model.add(AveragePooling2D(pool_size=(3,3), strides=None))
    model.add(Flatten())
    model.add(Dense(50, activation = 'relu',kernel_regularizer=regularizers.l2(0.001)))
    model.add(Dropout(0.1))
    model.add(Dense(10, activation = 'linear'))
    model.add(LeakyReLU())
    model.add(Softmax())
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

callbacks_list  = [
    EarlyStopping( monitor = 'loss',  min_delta=0.0001, patience=30,
                  verbose=1, mode='auto'),
    ModelCheckpoint(filepath = save_path+"/weights.{epoch:02d}-{val_loss:.4f}-{val_accuracy:.4f}.hdf5",
                    monitor = 'val_loss', save_best_only=True ),
    ReduceLROnPlateau(monitor='val_loss', factor=0.9, patience=10, verbose=1, mode='min', min_delta=1e-4)]

model = build_model()
model.summary()
history = model.fit(train_X,train_Y,epochs=200,batch_size=64,validation_data=(test_X,test_Y),callbacks=callbacks_list)
pd.DataFrame(history.history).to_csv(save_path+"/history.csv")

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_3 (Conv2D)            (None, 40, 44, 70)        700       
_________________________________________________________________
average_pooling2d_2 (Average (None, 19, 21, 70)        0         
_________________________________________________________________
depthwise_conv2d_2 (Depthwis (None, 19, 21, 70)        700       
_________________________________________________________________
conv2d_4 (Conv2D)            (None, 19, 21, 50)        31550     
_________________________________________________________________
batch_normalization_2 (Batch (None, 19, 21, 50)        200       
_________________________________________________________________
depthwise_conv2d_3 (Depthwis (None, 19, 21, 50)        500       
_________________________________________________________________
conv2d_5 (Conv2D)            (None, 19, 21, 30)       

In [None]:
model_json = model.to_json()
output = model.predict(test_X)
predicted_classes = output.argmax(axis=1)
answer_classes = test_Y.argmax(axis=1)
acc = accuracy_score(answer_classes, predicted_classes)
with open(save_path+"/model_acc_{:.4f}.json".format(acc), 'w') as json_file:
    json_file.write(model_json)

#Weight save
model.save_weights(save_path +"/final_weight.h5")
model_json = model.to_json()
with open(save_path+"/model.json".format(acc), 'w') as json_file:
    json_file.write(model_json)

In [None]:
from numpy.linalg import norm
def cos_sim(A,B):
    return np.dot(A, B)/(norm(A)*norm(B))

In [None]:
# def distance(vector1,vector2):
#     vector1 = np.array(vector1)
#     vector2 = np.array(vector2)
#     vector1 = vector1 - vector2
#     vector1 = vector1**2
#     return np.sqrt(vector1.sum())

def distance(vector1, vector2):
    return -10*cos_sim(vector1,vector2)

def openmax_param(model,trainx,trainy):
    import pandas as pd
    import keras
    from tensorflow.keras import optimizers
    class_num = len(np.array(model.weights[-1])) # Number of Class
    if len(np.shape(trainx))!=4:
        trainx = np.expand_dims(trainx,axis=-1)
    if len(trainy[0])==1:
        from keras.utils import to_categorical
        trainy = to_categorical(trainy,class_num)
    x_predict = model.predict(trainx)
    corr_ind = np.where(np.argmax(trainy,axis=-1)==np.argmax(x_predict,axis=-1))
    ver_X_train = trainx[corr_ind]
    ver_Y_train = trainy[corr_ind] # Step1 Data classified correctly
    new_model = keras.models.Sequential(model.layers[:-1])
    new_model.compile(optimizer='adam',loss='categorical_crossentropy',
                        metrics=['accuracy'])
    logit_vector = np.array(new_model.predict(ver_X_train))
    logit_matrix = [[]]*class_num
    for i in range(len(ver_X_train)): # Save Logit Vector by its class
        idx = np.argmax(ver_Y_train[i])
        logit_matrix[idx] = logit_matrix[idx]+[logit_vector[i]]
    mean_vector = []
    for i in range(len(logit_matrix)): # Compute Mean Vector
        mean_vector.append(np.array(logit_matrix[i]).mean(axis=0))
    distance_matrix=[[]]*class_num
    for idx in range(len(logit_matrix)):
        for logit in logit_matrix[idx]: # Save the distance
            distance_ = distance(logit,mean_vector[idx])
            distance_matrix[idx] = distance_matrix[idx]+[distance_]
    for i in range(len(distance_matrix)): # Sort
        distance_matrix[i] = np.array(distance_matrix[i])
        distance_matrix[i] = np.sort(distance_matrix[i])
    hyparam=[[]]*class_num;w=[[]]*class_num
    from scipy.stats import weibull_min
    for i in range(len(distance_matrix)): # Generate Weibull Distribution
        temp = weibull_min.fit(distance_matrix[i][-30:])
        hyparam[i] = hyparam[i]+list(temp)
    return hyparam, new_model, class_num, mean_vector

def openmax(xdata,returnvalue):
    from scipy.stats import weibull_min
    hyparam = returnvalue[0]; new_model = returnvalue[1]
    class_num = returnvalue[2]; mean_vector = returnvalue[3]
    pred = new_model.predict(xdata)
    new_logits=[]
    for idx in range(len(pred)):
        new_logit=[];unknown=0
        for ind in range(class_num):
            logit=pred[idx]
            distance_=distance(logit,mean_vector[ind])
            weight=weibull_min.cdf(distance_,hyparam[ind][0],
                                   hyparam[ind][1],hyparam[ind][2])
            new_logit.append(logit[ind]*(1-weight))
            unknown+=pred[idx][ind]*weight
        new_logit.append(unknown)
        new_logits.append(new_logit)
    output = []
    for i in new_logits:
        output.append(np.argmax(i))
    return output

In [None]:
def load_model(jsonpath,weightpath):
    with open(jsonpath) as f:
        json = f.read()
    model = tf.keras.models.model_from_json(json)
    model.load_weights(weightpath)
    return model

json='/content/gdrive/MyDrive/수학캡스톤_공유폴더/eunu/result/model.json'
weight='/content/gdrive/MyDrive/수학캡스톤_공유폴더/eunu/result/weights.179-0.1769-0.9577.hdf5'
model = load_model(json,weight)

# X_data = np.expand_dims(X_data,axis=-1)
# uknX = np.load('/content/gdrive/MyDrive/수학캡스톤_공유폴더/data/rawdata/X_30.npy')
# uknX = np.expand_dims(uknX,axis=-1)
# totalX = np.concatenate((X_data,uknX),axis=0)
# totalY = np.concatenate((Y_data,uknY),axis=0)

from sklearn.metrics import confusion_matrix, accuracy_score
temp = openmax_param(model,train_X,train_Y)
res = openmax(totalX,temp)
print(confusion_matrix(totalY, res))
print('Accuracy:',accuracy_score(totalY,res))

[[ 2341     0     0     0     0     0     0     0     0     0    36]
 [    0  2339     0     0     0     0     0     0     0     0    36]
 [    0     0  2335     0     0     1     1     0     0     0    38]
 [    0     0     0  2327     0     0     0     1     0     2    29]
 [    0     0     0     0  2326     0     0     0     0     0    27]
 [    0     0     0     0     0  2347     1     0     0     1    18]
 [    0     0     0     0     0     1  2337     1     0     0    28]
 [    0     0     2     0     0     0     1  2314     0     0    40]
 [    1     0     1     0     0     0     0     1  2348     0    29]
 [    0     4     0     1     0     0     0     0     0  2327    40]
 [  487   882  1110  1286   649  3543  2337   581  1285  2071 26808]]
Accuracy: 0.7748489671049582


In [None]:
from sklearn.metrics import confusion_matrix, accuracy_score
# uknX = np.load('/content/gdrive/MyDrive/수학캡스톤_공유폴더/data/rawdata/X_30.npy')
# uknX = np.expand_dims(uknX,axis=-1)
# uknY = np.load('/content/gdrive/MyDrive/수학캡스톤_공유폴더/data/rawdata/Y_30.npy')
temp = openmax_param(model,train_X,train_Y)
res = openmax(uknX,temp)
print(confusion_matrix(uknY, res))
print('Accuracy:',accuracy_score(uknY,res))

In [None]:
# temp = openmax_param(model,train_X,train_Y)
res = openmax(X_data,temp)
from sklearn.metrics import confusion_matrix, accuracy_score
print(confusion_matrix(Y_data, res))
print('Accuracy:',accuracy_score(Y_data,res))
# print(confusion_matrix(np.argmax(Y_data,axis=1), res))
# print('Accuracy:',accuracy_score(np.argmax(Y_data,axis=1),res))