In [None]:
from tensorflow.keras.layers import Dense, Flatten, Input
from tensorflow.keras.models import Model
from modules import AEC_builder

import os
# GPU selection
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"]='5'

nb_layer = 1
_encoder = AEC_builder.Encoder(input_shape=(2000, 16, 1), nb_layer=nb_layer, padding='same', kernel_size=(10,3), strides=(1,1), pool_size=(5,3), pool_strides=(2,2))
_encoder.build()

if not(_encoder.error_shape):
    model = _encoder.model
    model.summary()

    # _bottle_neck = AEC_builder.BottleNeck(_encoder=_encoder.model, use_DENSE_OR_GAP='DENSE')
    # _bottle_neck.build()
    # model2 = _bottle_neck.model
    # model2.summary()
    
    # model3 = _bottle_neck.decoder
    # model3.summary()

    # _decoder = AEC_builder.Decoder(_bottleneck=_bottle_neck.decoder, nb_layer=nb_layer, use_UPSAMPLE_OR_DECONV='UPSAMPLE', padding='same', kernel_size=(10,3), strides=(2,2), pool_size=(2, 2))
    # _decoder.build()
    # model4 = _decoder.model
    # model4.summary()

In [None]:
a = model.output
b = Flatten()(a)
b = Dense(b.shape[1])(b)
c = Model(inputs=model.input, outputs=b)
c.summary()

In [None]:
import os
import pandas as pd
from modules.utils import append_default_path
from modules import AEC_builder

def getTupleFromStr(_str):
    width = _str.split(',')[0]
    height = _str.split(',')[1]
    
    width = int(width.split('(')[1])
    height = int(height.split(')')[0])
    
    return (width, height)

def get_ExperimentInfo():
    path_module, path_csv, path_scalogram = append_default_path()
    df_experiment = pd.read_excel(os.path.join(path_csv, 'AEC_experiments.xlsx'), index_col=0)
    return df_experiment

def get_PARAMS_from_ExperimentInfo(temp_experiment):
    encoder_PARAMS = {
        'nb_layer' : temp_experiment['nb_layer'],
        'kernel_size' : (temp_experiment['kernel_width'], (temp_experiment['kernel_height'])),
        'strides' : (temp_experiment['strides_width'], temp_experiment['strides_height']),
        'padding' : temp_experiment['padding'],
        'act_selection' : temp_experiment['act_selection'],
        'pool_size' : (temp_experiment['max_pool_width'], temp_experiment['max_pool_height']),
        'pool_strides' : (temp_experiment['max_pool_strides_width'], temp_experiment['max_pool_strides_height'])   
    }
    
    bottleneck_PARAMS = {
        'vector_len' : temp_experiment['vector_len'],
        'act_selection' : temp_experiment['act_selection'],
        'use_DENSE_OR_GAP' : temp_experiment['use_DENSE_OR_GAP']
    }
    
    decoder_PARAMS = {
        'nb_layer' : temp_experiment['nb_layer'],
        'use_UPSAMPLE_OR_DECONV' : temp_experiment['use_UPSAMPLE_OR_DECONV'],
        'kernel_size' : (temp_experiment['decoder_kernel_width'], temp_experiment['decoder_kernel_height']),
        'strides' : (temp_experiment['decoder_strides_width'], temp_experiment['decoder_strides_height']),
        'padding' : temp_experiment['padding'],
        'act_selection' : temp_experiment['act_selection'],
        'pool_size' : (temp_experiment['decoder_pool_width'], temp_experiment['decoder_pool_height'])        
    }

    return encoder_PARAMS, bottleneck_PARAMS, decoder_PARAMS


def get_model_with_experiment_info(temp_experiment):
    encoder_PARAMS, bottleneck_PARAMS, decoder_PARAMS = get_PARAMS_from_ExperimentInfo(temp_experiment)
    _encoder = AEC_builder.Encoder(**encoder_PARAMS)
    _encoder.build()
    
    bottleneck_PARAMS['_encoder'] = _encoder.model
    _bottle_neck = AEC_builder.BottleNeck(**bottleneck_PARAMS)
    _bottle_neck.build()
    
    decoder_PARAMS['_bottleneck'] = _bottle_neck.decoder
    _decoder = AEC_builder.Decoder(**decoder_PARAMS)
    _decoder.build()
    
    return _decoder.model
    

df_experiment = get_ExperimentInfo()
display(df_experiment)

model = get_model_with_experiment_info(df_experiment.loc[2, :])

In [None]:
model.summary()

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Flatten, Reshape, UpSampling2D, Dropout
from tensorflow.keras.layers import Conv3D, Conv3DTranspose, PReLU, BatchNormalization, MaxPool3D, Input, Conv2D, Conv2DTranspose, MaxPool2D
from tensorflow.keras import backend as K
from tensorflow.keras import regularizers

def ConvolutionalAutoencoder_v05(vector_len=30):
    weight_decay=0.0005

    model = Sequential()
    model.add(Conv2D(filters=24,
                        input_shape=(2000, 16, 1),
                        kernel_size=(10, 2),
                        strides=(1, 1),
                        kernel_regularizer=regularizers.l2(l=weight_decay),
                        padding='valid', name="Conv1"))
    model.add(BatchNormalization(name="BN1"))
    model.add(PReLU(name="PReLU1"))

    model.add(Conv2D(filters=48,
                        kernel_size=(10, 2),
                        strides=(2, 1),
                        kernel_regularizer=regularizers.l2(l=weight_decay),
                        padding='valid', name="Conv2"))
    model.add(BatchNormalization(name="BN2"))
    model.add(PReLU(name="PReLU2"))

    model.add(MaxPool2D(pool_size=(5, 2),
                        strides=(2, 1), name="Pool1"))

    model.add(Flatten())
    model.add(Dense(units=vector_len, name='embedding'))

    model.add(Dense(units=494*14*48, activation='relu'))

    model.add(Reshape((494,14,48)))

    model.add(Conv2DTranspose(filters=24,
                                kernel_size=(10, 2),
                                kernel_regularizer=regularizers.l2(
                                    l=weight_decay),
                                strides=(2, 1), name="Deconv1", padding='valid'))
    model.add(BatchNormalization(name="BN3"))
    model.add(PReLU(name="PReLU3"))
    model.add(Conv2DTranspose(filters=1,
                                kernel_size=(10, 2),
                                kernel_regularizer=regularizers.l2(
                                    l=weight_decay),
                                strides=(2, 1), name="Deconv2", padding='valid'))
    model.add(BatchNormalization(name="BN4"))
    
    return model

model_v05 = ConvolutionalAutoencoder_v05()
model_v05.summary()

In [None]:
a = model.output



In [None]:
import numpy as np
import os
# GPU selection
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"]='5'
from tensorflow.keras.models import load_model
model = load_model('/home/yschoi/INS_Clustering/Deep_clustering_v02/results/1/weights.h5')
train_data = np.load('./data'+'/np_scalograms_1ch_train.npy')
test_data = np.load('./data'+'/np_scalograms_1ch_test.npy')
train_data_pred = model.predict(train_data)


In [None]:
import tensorflow.keras.backend as K
import tensorflow as tf
# from modules.model_utils import custom_corr

def custom_corr(X_true, X_pred):
    corr_train = []
    for i in range(len(X_true)):
        temp_corr = np.corrcoef(K.flatten(tf.squeeze(X_true[i])),\
                                K.flatten(tf.squeeze(X_pred[i])))[0,1]
        corr_train.append(temp_corr)
        
    return np.mean(corr_train)

def corr_keras(X_true, X_pred):
    corr = tf.py_function(func=custom_corr, inp=[X_true, X_pred], Tout=tf.float32, name='corr')
    return corr

corr_keras(train_data, train_data_pred)

In [None]:
from modules.model_utils import custom_corr

print(custom_corr(train_data, train_data_pred))

In [None]:
train_data[0].flatten()

In [None]:
corr_train = []
for i in range(len(train_data)):
    temp_corr = np.corrcoef(train_data[i].squeeze().flatten(),\
                            train_data_pred[i].squeeze().flatten())[0,1]
    corr_train.append(temp_corr)

In [None]:
import seaborn as sns

print("mean corr: {}".format(np.mean(corr_train)))
sns.histplot(data=corr_train)

In [None]:
def get_mean_corr(X_test, X_test_pred):
    for i in range