## After Train Autoencoder Feature Extractor, Fine tune design for Weak, OOP, ...

In [None]:
import os
import numpy as np
import random
import matplotlib.pyplot as plt
import importlib
%matplotlib inline

from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import LearningRateScheduler, EarlyStopping, ModelCheckpoint
from tensorflow.keras.models import load_model
from tensorflow.keras.utils import to_categorical

from load.load_data import load_opendata
from load.load_data import load_mydata

import utils.process
importlib.reload(utils.process)
from utils.process import preprocess, noise, augment, tsne_plot, display_pair, shuffler, label_dict_static, display_row, score, matching_plot

import model.model_autoencoder

In [None]:
# Intialize the tensorflow-gpu <-> physical matching
import tensorflow as tf

gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # Currently, memory growth needs to be the same across GPUs
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        # Memory growth must be set before GPUs have been initialized
        print(e)

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input

model_load = load_model("./ckpt/model_oop_autoencoder_fine")

inputs = Input(shape=(64,64,3), name='encoder_input')
x1 = model_load.layers[0](inputs)
x2 = model_load.layers[1](x1)
x3 = model_load.layers[3](x2)
x4 = model_load.layers[4](x3)
x5 = model_load.layers[5](x4)
x6 = model_load.layers[6](x5)
x7 = model_load.layers[7](x6)
x8 = model_load.layers[8](x7)
x9 = model_load.layers[9](x8)

encoder = Model(inputs = inputs, outputs = x9, name='encoder')

In [None]:
encoder.trainable=False
encoder.summary()

In [None]:
import model.model_autoencoder
importlib.reload(model.model_autoencoder)
from model.model_autoencoder import model_classifier_with_encoder

fin = model_classifier_with_encoder(encoder)
fin.summary()

In [None]:
tf.keras.utils.plot_model(fin)

## 1. OOP train using encoder

In [None]:
## 1. Load & Prepare Data
openloader = load_opendata()
X_open, Y_open = openloader.load_data(classifier_label="OOP", dsize=(64,64), comp_ratio=4)

myloader = load_mydata()
X_my, Y_my = myloader.load_data(classifier_label="OOP", dsize=(64,64), comp_ratio=10, verbose=0)
X_my_test, Y_my_test = myloader.load_test_data(classifier_label="OOP", dsize=(64,64), comp_ratio=4, verbose=0)

print("Open data is ", len(X_open),", My Train set is ", len(Y_my),  ",  My Test set is ", len(Y_my_test))

In [None]:
## 2. Data Class Distribution
count_center =0
count_phone =0
count_close =0
count_far =0
count_behind =0

for idx, i in enumerate(Y_my):
    if ( i == 'c0'):
        count_center +=1
    elif ( i == 'c1'):
        count_phone+=1
    elif ( i == 'c5'):
        count_close+=1
    elif ( i == 'c6'):
        count_far+=1    
    elif ( i == 'c7'):
        count_behind+=1

count_test_center =0
count_test_phone =0
count_test_close =0
count_test_far =0
count_test_behind =0

for idx, i in enumerate(Y_my_test):
    if ( i == 'c0'):
        count_test_center +=1
    elif ( i == 'c1'):
        count_test_phone+=1
    elif ( i == 'c5'):
        count_test_close+=1
    elif ( i == 'c6'):
        count_test_far+=1    
    elif ( i == 'c7'):
        count_test_behind+=1

count_open_center =0
count_open_phone =0
count_open_close =0
count_open_far =0
count_open_behind =0

for idx, i in enumerate(Y_open):
    if ( i == 'c0'):
        count_open_center +=1
    elif ( i == 'c1'):
        count_open_phone+=1
    elif ( i == 'c5'):
        count_open_close+=1
    elif ( i == 'c6'):
        count_open_far+=1    
    elif ( i == 'c7'):
        count_open_behind+=1

fig, (ax1, ax2, ax3) = plt.subplots(1,3, figsize=(10,5))
plt.suptitle('labeled data distribution(ea) \n (Trainset + Openset) is total Trainset')
ax1.set_title('Trainset')
ax1.bar(['center','phone', 'close', 'far', 'behind'],[count_center, count_phone, count_close, count_far, count_behind])
ax2.set_title('Openset')
ax2.bar(['center','phone', 'close', 'far', 'behind'],[count_open_center, count_open_phone, count_open_close, count_open_far, count_open_behind])
ax3.set_title('Testset')
ax3.bar(['center','phone', 'close', 'far', 'behind'],[count_test_center, count_test_phone, count_test_close, count_test_far, count_test_behind])

plt.tight_layout()

In [None]:
## 3. Data preprocessing

# Normalized Image
X1 = preprocess(X_my, shape = (64,64,3))
X2 = preprocess(X_open, shape=(64,64,3))
X = np.concatenate((X1, X2), axis=0)
Y = np.concatenate((Y_my, Y_open), axis=0)

X_test_oop = preprocess(X_my_test, shape=(64,64,3))

# Data Augmentation (5%*3=15%)
X, Y = augment(X, labels=Y, ratio=0.05)

# Shuffled Image
X, Y = shuffler(X, Y)

# One-hot encoding
label_map, label_str = label_dict_static(classifier="OOP")
Y = np.array(list(map(lambda x: label_map[x], Y)))
Y = to_categorical(Y)

y_test = np.array(list(map(lambda x: label_map[x], Y_my_test)))
y_test_oop = to_categorical(y_test)

In [None]:
## 4. Split Data in train mode
# Train, Val, Test Split
from sklearn.model_selection import train_test_split
X_train_oop, X_val_oop, y_train_oop, y_val_oop = train_test_split(X, Y, test_size=0.2, stratify=Y) 
print("X_train: {}\ny_train: {}\nX_val: {}\ny_val: {}\nX_test: {}\ny_test: {}".format(X_train_oop.shape, y_train_oop.shape, X_val_oop.shape, y_val_oop.shape, X_test_oop.shape, y_test_oop.shape))

In [None]:
display_row(X_train_oop, y_train_oop, label_map, label_str)

In [None]:
## 5. Prepare Training Dummy label

#oop_dummy_label_train = np.zeros(shape=(len(y_train_oop),5))
weak_dummy_label_train = np.zeros(shape=(len(y_train_oop),2), dtype=float)
mask_dummy_label_train = np.zeros(shape=(len(y_train_oop),2), dtype=float)

#oop_dummy_label_val = np.zeros(shape=(len(y_val_oop),5))
weak_dummy_label_val = np.zeros(shape=(len(y_val_oop),2), dtype=float)
mask_dummy_label_val = np.zeros(shape=(len(y_val_oop),2), dtype=float)

#oop_dummy_label_test = np.zeros(shape=(len(y_test_val),5))
weak_dummy_label_test = np.zeros(shape=(len(y_test_oop),2), dtype=float)
mask_dummy_label_test = np.zeros(shape=(len(y_test_oop),2), dtype=float)

In [None]:
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import LearningRateScheduler, EarlyStopping, ModelCheckpoint


def scheduler(epoch, lr):
    if epoch in [5,9,12,20]:
        lr = 0.1*lr
    return lr

adam = Adam(learning_rate=0.001)
ls_callback = LearningRateScheduler(scheduler)
es_callback = EarlyStopping(monitor='out_oop_val_acc', patience=3)
filepath = os.path.join(os.getcwd(), "ckpt/", "Encoder_OOP-{epoch:01d}-{val_acc:.2f}.h5")
checkpoint = ModelCheckpoint(filepath=filepath, monitor='out_oop_val_acc', verbose=1, save_best_only=True, mode='max')

fin.compile(optimizer = adam,
            loss={'out_oop': 'categorical_crossentropy',
                  'out_weak': 'categorical_crossentropy',
                  'out_mask': 'categorical_crossentropy'},
            loss_weights={'out_oop': 1.0, 'out_weak': 0.0, 'out_mask': 0.0},
            metrics={
                'out_oop': ['acc', tf.keras.metrics.AUC(thresholds=[0.5,0.7,0.9,0.95])],
                'out_weak': ['acc', tf.keras.metrics.AUC(thresholds=[0.5,0.7,0.9,0.95])],
                'out_mask': ['acc', tf.keras.metrics.AUC(thresholds=[0.5,0.7,0.9,0.95])]
                }
            )

In [None]:
fin.get_layer('fc2').trainable=False
fin.get_layer('fc3').trainable=False
fin.get_layer('out_weak').trainable=False
fin.get_layer('out_mask').trainable=False
fin.summary()

In [None]:
history = fin.fit(x=X_train_oop, y={'out_oop': y_train_oop, 'out_weak': weak_dummy_label_train, 'out_mask': mask_dummy_label_train},
                 epochs=10, batch_size=32, shuffle=True)

In [None]:
fin.evaluate(X_val_oop, {'out_oop': y_val_oop, 'out_weak': weak_dummy_label_val, 'out_mask': mask_dummy_label_val})

In [None]:
X_val_oop.shape, y_val_oop.shape, weak_dummy_label_val.shape, mask_dummy_label_val.shape

In [None]:
# Test using X_test data
X_pred=fin.predict(X_test_oop)
score(X_pred[0], y_test_oop)

In [None]:
fig, ax = plt.subplots(1, 3, figsize = (15,5))
for i in range(3):
    matching_plot(X_test_oop, X_pred[0], y_test_oop, ax[i], label_map, label_str, thd=0.8)
plt.show()

In [None]:
import utils.xai_viz
from utils.xai_viz import explainable_model
importlib.reload(utils.xai_viz)

xai = explainable_model(fin)

In [None]:
pick = random.randint(0, len(X_test)-1)

heatmap = xai.explainable_model(X_test[pick], "adhesive_conv", alpha=0.4, output_node=0)
fig_title = "Predicted: {} as {:.3f}".format(np.argmax(X_pred[0][pick]), X_pred[0][pick][np.argmax(X_pred[0][pick])])  +   ",    Label: {}".format(np.argmax(y_test[pick]))  
plt.title(fig_title)

heatmap = xai.explainable_model(X_test[pick], "adhesive_relu", alpha=0.4, output_node=0)
fig_title = "Predicted: {} as {:.3f}".format(np.argmax(X_pred[0][pick]), X_pred[0][pick][np.argmax(X_pred[0][pick])])  +   ",    Label: {}".format(np.argmax(y_test[pick]))  
plt.title(fig_title)


heatmap = xai.explainable_model(X_test[pick], "adhesive_maxpool", alpha=0.4, output_node=0)
fig_title = "Predicted: {} as {:.3f}".format(np.argmax(X_pred[0][pick]), X_pred[0][pick][np.argmax(X_pred[0][pick])])  +   ",    Label: {}".format(np.argmax(y_test[pick]))  
plt.title(fig_title)

## 2. Weak train using encoder

In [None]:
myloader = load_mydata()
X_my, Y_my = myloader.load_data(classifier_label="Weak", dsize=(64,64), comp_ratio=4, verbose=0)
X_my_test, Y_my_test = myloader.load_test_data(classifier_label="Weak", dsize=(64,64), comp_ratio=4, verbose=0)

print("My Train set is ", len(Y_my),  ",  My Test set is ", len(Y_my_test))

In [None]:
count_man =0
count_woman =0

for idx, i in enumerate(Y_my):
    if ( i == 's0'):
        count_man +=1
    elif ( i == 's1'):
        count_woman+=1

count_test_man =0
count_test_woman =0

for idx, i in enumerate(Y_my_test):
    if ( i == 's0'):
        count_test_man +=1
    elif ( i == 's1'):
        count_test_woman +=1

fig, (ax1, ax2) = plt.subplots(1,2, figsize=(10,5))
plt.suptitle('labeled data distribution(ea)')
ax1.set_title('Trainset')
ax1.bar(['man','woman'],[count_man, count_woman])

ax2.set_title('Testset')
ax2.bar(['man','woman'],[count_test_man, count_test_woman])

plt.tight_layout()

In [None]:
# Normalized Image
X = preprocess(X_my, shape = (64,64,3))
X_test = preprocess(X_my_test, shape=(64,64,3))

# Data Augmentation (5%*3=15%)
X, Y = augment(X, labels=Y_my, ratio=0.05)

# Shuffled Image
X, Y = shuffler(X, Y)

# One-hot encoding
label_map, label_str = label_dict_static(classifier="Weak")
Y = np.array(list(map(lambda x: label_map[x], Y)))
Y = to_categorical(Y)

y_test = np.array(list(map(lambda x: label_map[x], Y_my_test)))
y_test = to_categorical(y_test)

In [None]:
X_train, X_val, y_train, y_val = train_test_split(X, Y, test_size=0.2, stratify=Y) 
print("X_train: {}\ny_train: {}\nX_val: {}\ny_val: {}\nX_test: {}\ny_test: {}".format(X_train.shape, y_train.shape, X_val.shape, y_val.shape, X_test.shape, y_test.shape))

In [None]:
display_row(X_train, y_train, label_map, label_str)

In [None]:
adam = Adam(learning_rate=0.001)
fin.compile(optimizer = adam,
            loss={'out_oop': 'categorical_crossentropy',
                  'out_weak': 'categorical_crossentropy',
                  'out_mask': 'categorical_crossentropy'},
            loss_weights={'out_oop': 0.0, 'out_weak': 1.0, 'out_mask': 0.0},
            metrics=['acc'])

In [None]:
#Dummy target label
oop_dummy_label_train = np.zeros(shape=(len(y_train),5))
mask_dummy_label_train = np.zeros(shape=(len(y_train),2))

oop_dummy_label_val = np.zeros(shape=(len(y_val),5))
mask_dummy_label_val = np.zeros(shape=(len(y_val),2))

oop_dummy_label_test = np.zeros(shape=(len(y_test),5))
mask_dummy_label_test = np.zeros(shape=(len(y_test),2))

In [None]:
fin.get_layer('fc1').trainable=False
fin.get_layer('fc3').trainable=False
fin.get_layer('out_oop').trainable=False
fin.get_layer('out_mask').trainable=False

fin.summary()


In [None]:
history = fin.fit(X_train, {'out_oop': oop_dummy_label_train, 'out_weak': y_train , 'out_mask': mask_dummy_label_train}
                    , epochs=4, batch_size=32, shuffle=True)

In [None]:
# Test using X_test data
X_pred=fin.predict(X_test)
score(X_pred[1], y_test)

In [None]:
fig, ax = plt.subplots(1, 3, figsize = (15,5))
for i in range(3):
    matching_plot(X_test, X_pred[1], y_test, ax[i], label_map, label_str, thd=0.8)
plt.show()

In [None]:
import utils.xai_viz
from utils.xai_viz import explainable_model
importlib.reload(utils.xai_viz)

xai = explainable_model(fin)

In [None]:
pick = random.randint(0, len(X_test)-1)

heatmap = xai.explainable_model(X_test[pick], "adhesive_conv", alpha=0.4, output_node=1)
fig_title = "Predicted: {} as {:.3f}".format(np.argmax(X_pred[0][pick]), X_pred[0][pick][np.argmax(X_pred[0][pick])])  +   ",    Label: {}".format(np.argmax(y_test[pick]))  
plt.title(fig_title)

heatmap = xai.explainable_model(X_test[pick], "adhesive_relu", alpha=0.4, output_node=1)
fig_title = "Predicted: {} as {:.3f}".format(np.argmax(X_pred[0][pick]), X_pred[0][pick][np.argmax(X_pred[0][pick])])  +   ",    Label: {}".format(np.argmax(y_test[pick]))  
plt.title(fig_title)


heatmap = xai.explainable_model(X_test[pick], "adhesive_maxpool", alpha=0.4, output_node=1)
fig_title = "Predicted: {} as {:.3f}".format(np.argmax(X_pred[0][pick]), X_pred[0][pick][np.argmax(X_pred[0][pick])])  +   ",    Label: {}".format(np.argmax(y_test[pick]))  
plt.title(fig_title)

In [None]:
X_my_test, Y_my_test = myloader.load_test_data(classifier_label="OOP", dsize=(64,64), comp_ratio=4, verbose=0)
X_test = preprocess(X_my_test, shape=(64,64,3))
# One-hot encoding
label_map, label_str = label_dict_static(classifier="OOP")
y_test = np.array(list(map(lambda x: label_map[x], Y_my_test)))
y_test = to_categorical(y_test)

In [None]:
# Test using X_test data
X_pred=fin.predict(X_test)
score(X_pred[0], y_test)


## 3. Mask train using encoder