[View in Colaboratory](https://colab.research.google.com/github/Joovvhan/Master-Thesis/blob/master/src/Transfer_Learning_Clean_Up.ipynb)

In [0]:
# Import necessary modules

import os
import matplotlib.pyplot as plt
import numpy as np
import scipy.io.wavfile as wf
import time
import glob

from tqdm import trange

# Import Keras modules

from keras.preprocessing import image
from keras.layers import Input, Flatten, Dense, Dropout, GlobalAveragePooling2D
from keras.models import Sequential
from keras import backend as K

In [2]:
# Mount google drive

from google.colab import drive
drive.mount('/content/gdrive')
os.listdir('gdrive/My Drive/Colab')

Mounted at /content/gdrive


['Model', 'Data']

In [0]:
# Set data directories

dataPath = 'gdrive/My Drive/Colab/Data'

# Changed variable names to normal and fault
# Changed variable names from folder to path
# Need to consider multiple folders
# Need to add files to be tested

folderNormal = 'A3F1P3'
folderFault = 'A3F5P3'
pathNormal = dataPath + '/' + folderNormal
pathFault = dataPath + '/' + folderFault
filesNormal = os.listdir(pathNormal)
filesFault = os.listdir(pathFault)

In [0]:
# Specgram settings

nsc = 1470
nov = nsc/2
nff = nsc 
imgSize = 224

# Learning parameters

trainingRatio = 0.8

In [5]:
# Execution confirmed with new variable names

# Check whether npy file exists 

npyNormalPath = glob.glob(pathNormal + '/' + '*.npy')

if (len(npyNormalPath) == 1):
    imgsNormal = np.load(npyNormalPath[0])

else:
    imgsNormal = np.zeros([len(filesNormal), imgSize, imgSize])
    
    for i in trange(len(filesNormal)):
        fs, dataInt16 = wf.read(pathNormal + '/' + filesNormal[i])
        dataFloat = dataInt16 / (2 ** 15)
        Pxx, _, _, _ = plt.specgram(dataFloat, NFFT=nff, Fs=fs, noverlap=nov, \
                                           window=np.hamming(nsc), cmap='viridis')
        plt.close()
        imgsNormal[i, :, :] = 10 * np.log10(Pxx[0:imgSize, :])
        
    np.save(pathNormal + '/' + folderNormal + '.npy', imgsNormal)
        
print('Normal Image Shape: {}'.format(imgsNormal.shape))

Normal Image Shape: (1000, 224, 224)


In [25]:
# Execution confined with new varible names

# Check whether npy file exists

npyFaultPath = glob.glob(pathFault + '/' + '*.npy')

if (len(npyFaultPath) == 1):
    imgsFault = np.load(npyFaultPath[0])

else:
    imgsFault = np.zeros([len(filesFault), imgSize, imgSize])

    for i in trange(len(filesFault)):
        fs, dataInt16 = wf.read(pathFault + '/' + filesFault[i])
        dataFloat = dataInt16 / (2 ** 15)
        Pxx, _, _, _ = plt.specgram(dataFloat, NFFT=nff, Fs=fs, noverlap=nov, \
                                           window=np.hamming(nsc), cmap='viridis')
        plt.close()
        imgsFault[i, :, :] = 10 * np.log10(Pxx[0:imgSize, :])
        
    np.save(pathFault + '/' + folderFault + '.npy', imgsFault)
        
print('Fault Image Shape: {}'.format(imgsFault.shape))

Fault Image Shape: (1000, 224, 224)


In [55]:
# Change name from imgsF1 or imgsF5 to imgsNormal and imgsFault

dataNumNormal = len(imgsNormal)
dataNumFault = len(imgsFault)
dataNumNormalTrain = int(dataNumNormal * trainingRatio)
dataNumFaultTrain = int(dataNumFault * trainingRatio)
dataNumNormalTest = dataNumNormal - dataNumNormalTrain
dataNumFaultTest = dataNumFault - dataNumFaultTrain

print('Normal Train:Test = {:d}:{:d}'.format(dataNumNormalTrain, dataNumNormalTest))
print('Fault  Train:Test = {:d}:{:d}\n'.format(dataNumFaultTrain, dataNumFaultTest))

trainIdxNormal = np.random.choice(dataNumNormal - 1, dataNumNormalTrain, replace=False)
testIdxNormal = list(set(range(0, dataNumNormal)) - set(trainIdxNormal))

trainImgsNormal = imgsNormal[trainIdxNormal, :, :]
testImgsNormal = imgsNormal[testIdxNormal, :, :]

print('Normal Training Image Shape {}'.format(trainImgsNormal.shape))
print('Normal Test Image Shape {}\n'.format(testImgsNormal.shape))

trainIdxFault  = np.random.choice(dataNumFault - 1, dataNumFaultTrain, replace=False)
testIdxFault = list(set(range(0, dataNumFault)) - set(trainIdxFault))

trainImgsFault = imgsFault[trainIdxFault, :, :]
testImgsFault = imgsFault[testIdxFault, :, :]

print('Fault Training Image Shape {}'.format(trainImgsFault.shape))
print('Fault Test Image Shape {}\n'.format(testImgsFault.shape))

trainImgs = np.vstack([trainImgsNormal, trainImgsFault])
testImgs = np.vstack([testImgsNormal, testImgsFault])

print('Training Image Shape {}'.format(trainImgs.shape))
print('Test Image Shape {}'.format(testImgs.shape))

Normal Train:Test = 800:200
Fault  Train:Test = 800:200

Normal Training Image Shape (800, 224, 224)
Normal Test Image Shape (200, 224, 224)

Fault Training Image Shape (800, 224, 224)
Fault Test Image Shape (200, 224, 224)

Training Image Shape (1600, 224, 224)
Test Image Shape (400, 224, 224)


In [56]:
trainMean = np.mean(trainImgs)
trainStd = np.std(trainImgs)

print('Mean of Training Image: {}'.format(trainMean))
print('Standard Deviation of Training Image: {}'.format(trainStd))

Mean of Training Image: -78.0968905281434
Standard Deviation of Training Image: 9.39221295547736


In [0]:
# Should Change Norm to Normalized

trainImgsNorm = (trainImgs - trainMean) / trainStd
testImgsNorm = (testImgs - trainMean) / trainStd

trainImgsNorm = trainImgsNorm.reshape(list(trainImgsNorm.shape) + [1])
testImgsNorm = testImgsNorm.reshape(list(testImgsNorm.shape) + [1])

In [58]:
X_train = np.stack([trainImgsNorm[:, :, :, 0], trainImgsNorm[:, :, :, 0], trainImgsNorm[:, :, :, 0]], axis = -1)
X_test = np.stack([testImgsNorm[:, :, :, 0], testImgsNorm[:, :, :, 0], testImgsNorm[:, :, :, 0]], axis = -1)

print('X_train Shape: {}'.format(X_train.shape))
print('X_test  Shape: {}'.format(X_test.shape))

X_train Shape: (1600, 224, 224, 3)
X_test  Shape: (400, 224, 224, 3)


In [59]:
trainLabelNormal = np.stack((np.ones(dataNumNormalTrain), np.zeros(dataNumNormalTrain)), axis = -1)
testLabelNormal = np.stack((np.ones(dataNumNormalTest), np.zeros(dataNumNormalTest)), axis = -1)

trainLabelFault = np.stack((np.zeros(dataNumFaultTrain), np.ones(dataNumFaultTrain)), axis = -1)
testLabelFault = np.stack((np.zeros(dataNumFaultTest), np.ones(dataNumFaultTest)), axis = -1)

Y_train = np.vstack((trainLabelNormal, trainLabelFault))
Y_test = np.vstack((testLabelNormal, testLabelFault))

print('Y_train Normal:Fault = {:d}:{:d}'.format(len(trainLabelNormal), len(trainLabelFault)))
print('Y_test  Normal:Fault = {:d}:{:d}'.format(len(testLabelNormal), len(testLabelFault)))

Y_train Normal:Fault = 800:800
Y_test  Normal:Fault = 200:200


In [60]:
# Confirmed successfully running through here

from keras.applications.inception_v3 import InceptionV3

for i in range(100):

    input_tensor = Input(shape=(224, 224, 3))
    
    modelInceptionV3WoTop = None
    modelInceptionV3 = None

    modelInceptionV3WoTop = InceptionV3(input_tensor=input_tensor, weights='imagenet', include_top=False)
    # modelInceptionV3WoTop.summary()
    # modelInceptionV3 = InceptionV3(input_tensor=input_tensor, weights='imagenet', include_top=True)
    # modelInceptionV3.summary()

    modelInceptionV3 = Sequential()

    modelInceptionV3.add(modelInceptionV3WoTop)
    modelInceptionV3.add(GlobalAveragePooling2D())
    modelInceptionV3.add(Dense(2, activation='softmax'))

    # modelInceptionV3.summary()

    modelInceptionV3.compile(loss='binary_crossentropy',
                  optimizer='adam',
                  metrics=['accuracy'])

    modelInceptionV3.fit(X_train, Y_train,
              batch_size=4, epochs=6, verbose=1,
              validation_data=(X_test, Y_test))
    
    K.clear_session()

Train on 1600 samples, validate on 400 samples
Epoch 1/6
Epoch 2/6
Epoch 3/6
Epoch 4/6

KeyboardInterrupt: ignored

In [0]:
from keras.applications import VGG19

from keras import backend as K

for i in range(100):
    
    input_tensor = Input(shape=(224, 224, 3))
    
    modelVGG19WoTop = None
    modelVGG19 = None

    modelVGG19WoTop = VGG19(input_tensor=input_tensor, weights='imagenet', include_top=False)

    modelVGG19 = Sequential()

    modelVGG19.add(modelVGG19WoTop)
    modelVGG19.add(Flatten())
    modelVGG19.add(Dense(4096, activation='relu'))
    modelVGG19.add(Dropout(0.5))
    modelVGG19.add(Dense(4096, activation='relu'))
    modelVGG19.add(Dropout(0.5))
    
#     modelVGG19.add(Dense(128, activation='relu'))
#     modelVGG19.add(Dropout(0.5))
#     modelVGG19.add(Dense(128, activation='relu'))
#     modelVGG19.add(Dropout(0.5))
    
    modelVGG19.add(Dense(2, activation='softmax'))

    modelVGG19.compile(loss='binary_crossentropy',
                  optimizer='adam',
                  metrics=['accuracy'])

    modelVGG19.fit(X_train, Y_train,
              batch_size=2, epochs=8, verbose=1,
              validation_data=(X_test, Y_test))
    
    K.clear_session()

In [0]:
from keras.applications import VGG16

input_tensor = Input(shape=(224, 224, 3))

modelVGG16WoTop = None
modelVGG16 = None

modelVGG16WoTop = VGG16(input_tensor=input_tensor, weights='imagenet', include_top=False)

modelVGG16 = Sequential()

modelVGG16.add(modelVGG16WoTop)
modelVGG16.add(Flatten())
modelVGG16.add(Dense(100, activation='relu'))
modelVGG16.add(Dropout(0.5))
modelVGG16.add(Dense(100, activation='relu'))
modelVGG16.add(Dropout(0.5))

modelVGG16.add(Dense(2, activation='softmax'))

modelVGG16.compile(loss='binary_crossentropy',
              optimizer='adam',
              metrics=['accuracy'])

modelVGG16.fit(X_train, Y_train,
          batch_size=2, epochs=2, verbose=1,
          validation_data=(X_test, Y_test))

In [0]:
from keras.applications.resnet50 import ResNet50
from keras.applications.resnet50 import preprocess_input, decode_predictions

modelResNet50WoTop = None
modelResNet50 = None

input_tensor = Input(shape=(224, 224, 3))
modelResNet50WoTop = ResNet50(input_tensor=input_tensor, weights='imagenet', include_top=False)

modelResNet50 = Sequential()

modelResNet50.add(modelResNet50WoTop)
modelResNet50.add(Flatten())
modelResNet50.add(Dense(2, activation='softmax'))

modelResNet50.compile(loss='binary_crossentropy',
              optimizer='adam',
              metrics=['accuracy'])

modelResNet50.fit(X_train, Y_train,
          batch_size=4, epochs=8, verbose=1,
          validation_data=(X_test, Y_test))

In [0]:
from keras.models import load_model
from sklearn.metrics import classification_report, confusion_matrix
import datetime

now = datetime.datetime.now()
modelResNet50.save('gdrive/My Drive/Colab/Model/ResNet50 {}.h5'.format(now.strftime('%m-%d %H:%M:%S')))
# modelResNet50_ = load_model('gdrive/My Drive/Colab/Model/ResNet50.h5')

# Y_pred = modelResNet50_.predict(X_test)
# print(confusion_matrix(np.argmax(Y_pred, axis = 1), np.argmax(Y_test, axis = 1)))

In [0]:
# for i in range(1000, 1010):
#     plt.figure()
#     plt.imshow(X_train[i, :, :, 0]);
#     plt.plot()

