## Comparacion modelos CNN y RL-CNN

In [120]:
# libraries
from tensorflow import keras
from keras.utils import to_categorical
import plotly.graph_objects as go
import plotly.figure_factory as ff
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score, confusion_matrix
import scipy.io as sio
from scipy.fft import fft

In [119]:
# Loading functions for all files in a certain configuration
# load a single file as a numpy array
def load_file(filepath, filename):
	data = sio.loadmat(filepath)
	
	# vibration data
	vib_data = data[filename]['Y'][0][0][0][6][2].transpose()
	vib_data = vib_data.reshape(vib_data.shape[0],)

	# frequency domain and splitting into samples of size 1024 (512 in freq)
	n_loop = round(vib_data.shape[0]/1024)
	vib_freq = np.zeros([n_loop,512])
	for i in range(n_loop):
		vib_data_1024 = vib_data[1024*i:1024*(1+i)]
		vib_freq[i] = np.abs(fft(vib_data_1024))[0:1024//2]

	return vib_freq

# load a dataset group, such as train or test
def load_dataset_group(prefix='',group='', setting=''):
	filepath = prefix + group + '/' + setting

	# load all 20 files as a single array
	# X will have 20*N rows and 512 columns
	# where N is the number of rows of each file when splitting into samples
	X = np.zeros([1,512])
	for i in range(1,4): # changed 20 to 3 to reduce dataset length
		path = filepath+'_'+group+'_'+str(i)+'.mat'
		filename = setting+'_'+group+'_'+str(i)
		if i == 1:
			X = load_file(path,filename)
		else:
			X = np.concatenate([X,load_file(path,filename)],axis=0)
	
	# load class output
	if group=='K001' or group=='K002' or group=='K003' or group=='K004' or group=='K005' or group=='K006': # healthy
		y = np.ones(X.shape[0])
	elif group=='KA04' or group=='KA16' or group=='KA22':# real OR damage
		y = np.ones(X.shape[0])*2
	elif group=='KI04' or group=='KI16' or group=='KI18':# real IR damage
		y = np.ones(X.shape[0])*3
	elif group=='KA01' or group=='KA03' or group=='KA07':# artificial OR damage
		y = np.ones(X.shape[0])*2
	elif group=='KI01' or group=='KI03' or group=='KI07':# artificial IR damage
		y = np.ones(X.shape[0])*3
	else:
		y = np.zeros(X.shape[0])
	return X, y

# load the dataset, returns train and test X and y elements
def load_dataset_settings(prefix='',setting='N15_M07_F10'):

	# load all data
	X_k001, y_k001 = load_dataset_group(prefix+'PPDataset/', 'K001', setting) # healthy
	X_k002, y_k002 = load_dataset_group(prefix+'PPDataset/', 'K002', setting) # healthy

	X_ka01, y_ka01 = load_dataset_group(prefix+'PPDataset/', 'KA01', setting) # artificial OR damage

	X_ki01, y_ki01 = load_dataset_group(prefix+'PPDataset/', 'KI01', setting) # artificial IR damage

	X_ka04, y_ka04 = load_dataset_group(prefix+'PPDataset/', 'KA04', setting) # real OR damage

	X_ki04, y_ki04 = load_dataset_group(prefix+'PPDataset/', 'KI04', setting) # real IR damage

	X = np.concatenate([X_k001,X_k002,X_ka01,X_ki01,X_ka04,X_ki04])
	y = np.concatenate([y_k001,y_k002,y_ka01,y_ki01,y_ka04,y_ki04])

	# zero-offset class values
	y = y - 1
	# one hot encode y
	y = to_categorical(y)

	X = X.reshape(X.shape[0],X.shape[1],1)

	print(X.shape, y.shape)
	return X, y

In [125]:
# load models
cnn_model = keras.models.load_model('models/cnn_model_13.h5')
ddqn_model = keras.models.load_model('models/ddqn_model_1.h5')

In [126]:
# load the test X and y elements
with open('PPDataset/train_test_data/testX.npy', 'rb') as f:
    testX = np.load(f)
with open('PPDataset/train_test_data/testy.npy', 'rb') as f:
    testy = np.load(f)

In [127]:
# CNN model
pred_test_cnn = cnn_model.predict(testX)

test_acc_cnn = accuracy_score(np.argmax(testy,axis=1), np.argmax(pred_test_cnn,axis=1))
print('Test accuracy for CNN model: ',test_acc_cnn)

Test accuracy for CNN model:  0.9785090664875755


In [128]:
# DDQN (CNN-RL) model
pred_test_ddqn = ddqn_model.predict(testX)

test_acc_ddqn = accuracy_score(np.argmax(testy,axis=1), np.argmax(pred_test_ddqn,axis=1))
print('Test accuracy for CNN-RL model: ',test_acc_ddqn)

Test accuracy for CNN-RL model:  0.9563465413028879


In [129]:
df_predictions = pd.DataFrame(np.argmax(pred_test_cnn,axis=1),columns=['CNN'])
df_predictions['DDQN'] = np.argmax(pred_test_ddqn,axis=1)
df_predictions['y_test'] = np.argmax(testy,axis=1)
df_predictions['diff'] = df_predictions['CNN']-df_predictions['DDQN']
df_predictions

Unnamed: 0,CNN,DDQN,y_test,diff
0,1,1,1,0
1,0,0,0,0
2,0,0,0,0
3,0,0,0,0
4,0,0,0,0
...,...,...,...,...
1484,2,2,2,0
1485,2,2,2,0
1486,2,2,2,0
1487,2,2,2,0


In [130]:
# Only plotting when different CNN and DDQN predictions, they don't match when predicting 0 and 2
fig = go.Figure()


fig.add_trace(go.Scatter(x=df_predictions.index, y=df_predictions[df_predictions['diff']!=0]['CNN'], mode='markers', marker_symbol='circle-open', marker_size=8, name='cnn'))
fig.add_trace(go.Scatter(x=df_predictions.index, y=df_predictions[df_predictions['diff']!=0]['DDQN'], mode='markers',marker_symbol='circle-open', marker_size=8, name='ddqn'))

fig.add_trace(go.Scatter(x=df_predictions.index, y=df_predictions[df_predictions['diff']!=0]['y_test'], mode='markers',marker_symbol='x', name='y_test'))


In [137]:
# Joint prediction of the two models

def join_predictions(pred_1,pred_2):
    concat_preds = np.stack((pred_1, pred_2), axis=-1)
    joint_preds = np.average(concat_preds, axis=2, weights=[0.7,0.3])
    return joint_preds

joint_preds = join_predictions(pred_test_cnn, pred_test_ddqn)
test_acc_joint = accuracy_score(np.argmax(testy,axis=1), np.argmax(joint_preds,axis=1))
print('Test accuracy for union of both models: ',test_acc_joint)

# cnn: 0.9785090664875755
# ddqn: 0.9563465413028879

Test accuracy for union of both models:  0.9805238415043653


In [138]:
# Construimos una visualización para la matriz de confusión
z_test = confusion_matrix(np.argmax(testy,axis=1), np.argmax(joint_preds,axis=1))
# Reformateo la matriz para que me quede mejor el gráfico
z_test[[0,2],:] = z_test[[2,0],:]
x = ['Healthy', 'OR fault', 'IR fault']
y = ['IR fault', 'OR fault', 'Healthy']
z_text = [[str(y) for y in x] for x in z_test]
heatmap = ff.create_annotated_heatmap(z_test, x=x, y=y, annotation_text=z_text, colorscale='tealrose')
heatmap.update_layout(title_text='Testing',height=300,width=600,
                      xaxis_title="Predicted Label",yaxis_title="True Label")
heatmap.show()

In [None]:
# Weight tuning using validation set
X_ki01, y_ki01 = load_dataset_group('PPDataset/', 'KI03', 'N15_M07_F10') # normal IR damage
X_ki01, y_ki01 = load_dataset_group('PPDataset/', 'KI03', 'N15_M07_F10') # artificial IR damage
X_ki04, y_ki04 = load_dataset_group('PPDataset/', 'KI04', 'N15_M07_F10') # real IR damage

In [139]:
indicesIRHealthy = np.where((np.argmax(testy,axis=1) == 2) & (np.argmax(joint_preds,axis=1) == 0))[0]
testX_IRHealthy = testX[indicesIRHealthy]
testX_IRHealthy = testX_IRHealthy.reshape(testX_IRHealthy.shape[0],testX_IRHealthy.shape[1])
testX_IRHealthy.shape

(18, 512)

In [140]:
# load all data
X_ki01, y_ki01 = load_dataset_group('PPDataset/', 'KI01', 'N15_M07_F10') # artificial IR damage
X_ki04, y_ki04 = load_dataset_group('PPDataset/', 'KI04', 'N15_M07_F10') # real IR damage

In [141]:
matches_artificial = []
for i in range(testX_IRHealthy.shape[0]):
    for j in range(X_ki01.shape[0]):
        if(np.array_equal(testX_IRHealthy[i], X_ki01[j])):
            matches_artificial.append(True)
            break
print("Artificial IR damage mistaken for Healthy: ",len(matches_artificial))

matches_real = []
for i in range(testX_IRHealthy.shape[0]):
    for j in range(X_ki04.shape[0]):
        if(np.array_equal(testX_IRHealthy[i], X_ki04[j])):
            matches_real.append(True)
            break
print("Real IR damage mistaken for Healthy: ",len(matches_real))

Artificial IR damage mistaken for Healthy:  14
Real IR damage mistaken for Healthy:  4


#### Test with other settings:

In [136]:
# Changing the rotational speed from 1500 rpm to 900 rpm
X_N09_M07_F10, y_N09_M07_F10 = load_dataset_settings(prefix='',setting='N09_M07_F10')

pred_N09_M07_F10_cnn = cnn_model.predict(X_N09_M07_F10)
pred_N09_M07_F10_ddqn = ddqn_model.predict(X_N09_M07_F10)

joint_preds_N09_M07_F10 = join_predictions(pred_N09_M07_F10_cnn, pred_N09_M07_F10_ddqn)

acc_N09_M07_F10 = accuracy_score(np.argmax(y_N09_M07_F10,axis=1), np.argmax(joint_preds_N09_M07_F10,axis=1))
print('Test accuracy: ',acc_N09_M07_F10)

# cnn: 0.6225283270384359
# ddqn: 0.6869584536769606

(4501, 512, 1) (4501, 3)
Test accuracy:  0.6396356365252166


In [134]:
# Changing the load torque from 0.7 Nm to 0.1 Nm
X_N15_M01_F10, y_N15_M01_F10 = load_dataset_settings(prefix='',setting='N15_M01_F10')

pred_N15_M01_F10_cnn = cnn_model.predict(X_N15_M01_F10)
pred_N15_M01_F10_ddqn = ddqn_model.predict(X_N15_M01_F10)

joint_preds_N15_M01_F10 = join_predictions(pred_N15_M01_F10_cnn, pred_N15_M01_F10_ddqn)

acc_N15_M01_F10 = accuracy_score(np.argmax(y_N15_M01_F10,axis=1), np.argmax(joint_preds_N15_M01_F10,axis=1))
print('Test accuracy: ',acc_N15_M01_F10)

# cnn: 0.9747060128688706
# ddqn: 0.9531839360994009

(4507, 512, 1) (4507, 3)
Test accuracy:  0.9758153982693588


In [135]:
# Changing the radial force from 1000 N to 400 N
X_N15_M07_F04, y_N15_M07_F04 = load_dataset_settings(prefix='',setting='N15_M07_F04')

pred_N15_M07_F04 = ddqn_model.predict(X_N15_M07_F04)

pred_N15_M07_F04_cnn = cnn_model.predict(X_N15_M07_F04)
pred_N15_M07_F04_ddqn = ddqn_model.predict(X_N15_M07_F04)

joint_preds_N15_M07_F04 = join_predictions(pred_N15_M07_F04_cnn, pred_N15_M07_F04_ddqn)

acc_N15_M07_F04 = accuracy_score(np.argmax(y_N15_M07_F04,axis=1), np.argmax(joint_preds_N15_M07_F04,axis=1))
print('Test accuracy: ',acc_N15_M07_F04)

# cnn: 0.962749445676275
# ddqn: 0.9567627494456763

(4510, 512, 1) (4510, 3)
Test accuracy:  0.9682926829268292


In [150]:
# def join_predictions(pred_1,pred_2,w):
#     concat_preds = np.stack((pred_1, pred_2), axis=-1)
#     joint_preds = np.average(concat_preds, axis=2, weights=[w,1-w])
#     return joint_preds

# best_w = None
# best_acc = 0
# for w in np.arange(0,1.1,0.1):
#     pred_cnn = cnn_model.predict(valX,verbose=0)
#     pred_ddqn = ddqn_model.predict(valX,verbose=0)
#     joint_preds = join_predictions(pred_cnn,pred_ddqn,w)
#     acc_joint = accuracy_score(np.argmax(valy,axis=1), np.argmax(joint_preds,axis=1))
#     if acc_joint > best_acc:
#         best_w = w
#         best_acc = acc_joint
# print(best_w, ', ',best_acc)