In [None]:
# dependencies import

#common
import os
import re
import time
import pathlib
import itertools
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from pprint import pprint
from matplotlib.patches import Rectangle
from matplotlib.axes import Axes

# ML
from tensorboard.plugins.hparams import api as hp
import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import (
    Input, Conv2D, MaxPooling2D, Dropout,
    concatenate, Flatten, Dense, UpSampling2D,
    BatchNormalization
)

# my
from create_logger import *
import custom_modules as dw
import models

logger = logging.getLogger(f'main.ae_test')

# Настраиваемые параметры

In [None]:
PATH_TO_MODEL = pathlib.Path(f'test/AE') # где искать модель
MODEL_VER = 1
MODEL_NUM = 1
ENCODED_SIZE = 8
DECODED_SIZE = 32
XSHIFT = 200 # сдвиг по оси х для визуализации

# Вспомогатеьлные функции

In [None]:
def draw_plot(data: list[dict], title = 'Результат одного замера УЗ-датчика', x_label='Время', y_label='Амплитуда', fontsize = 25, path_to_save = None):
    """
    Нарисовать график

    Параметры
    ----------
    data: list[dict]
        Список словарей. Каждый словарь хранит
        данные и все мараметры для рисования конкретного графика.
        Все графики будут нарисованы на 1 полотне. 'data' параметр обязателен в 
        каждом словаре. Он хранит список из 1 или 2 массивов - это x и y для 

    Пример: 
    time = [1,2,3,4,5]
    amp = [4,-4,5,-5,6]
    draw_plot([{'data':[time, amp], 'marker':'o', 'lw':3, 'label':'Исходные данные', 'ms':10, 'mfc':'black'}])
    Все ключевые слова взяты из функции plt.plot()

    plt.plot()
    Если указать в словаре только 'data' параметр без остальных, то оформление будет сделано автоматом

    """
    fig, ax = plt.subplots()
    fig.set_figwidth(18)
    fig.set_figheight(10)
    
    fig.set_facecolor('#37474f')
    ax.set_facecolor('black')


    for item in data:
        if not 'data' in item:
            raise ValueError('Каждый словарь должен содержать ключ "data"')
        if list(item.keys()) == ['data']:
            ax.plot(*item['data'], marker='o', lw=3, label='Исходные данные', ms=10, mfc='black')
        else:
            ax.plot(*item['data'], **item)
    

    fig.suptitle(title, fontsize=fontsize+5, c='#cacaca')
    ax.legend(fontsize = fontsize, labelcolor='#cacaca', facecolor='black')
    ax.set_xlabel(x_label, fontsize=fontsize, c='#cacaca')
    ax.set_ylabel(y_label, fontsize=fontsize, c='#cacaca')
    
    ax.tick_params(axis='both', labelsize = fontsize)
    ax.grid(True, which='major', axis='both', lw=1.5)
    ax.grid(True, which='minor', axis='both', ls='--')
    
    ax.minorticks_on()
    
    ax.tick_params(axis = 'both', which = 'major', length = 8, width = 4, colors='#cacaca')
    ax.tick_params(axis = 'both', which = 'minor', length = 4, width = 2, labelleft=True, colors='#cacaca', labelsize=fontsize-8)
    
    #ax.xaxis.set_minor_locator(MultipleLocator(0.05))
    #ax.yaxis.set_minor_locator(MultipleLocator(0.05))
    #ax.xaxis.set_minor_formatter(FormatStrFormatter("%.3f"))
    #ax.yaxis.set_minor_formatter(FormatStrFormatter("%.3f"))
    
    ax.set_facecolor
    #plt.subplots_adjust(left=0.1, bottom=0.1, right=0.9, top=0.9, wspace=0.1, hspace=0.1)
    if not path_to_save is None:
        plt.savefig(path_to_save, bbox_inches='tight')
    plt.tight_layout()
    plt.show()
    plt.close()

# Загрузка модели для тестирования

## Поиск модели по идентификатору

In [None]:
# какую модель взять
PATH_TO_MODEL = list(PATH_TO_MODEL.rglob(f'*id=v{MODEL_VER:04}n{MODEL_NUM:04}_in({DECODED_SIZE})_hid({ENCODED_SIZE})*.keras'))

if len(PATH_TO_MODEL) != 1:
    print(PATH_TO_MODEL)
    raise ValueError('Few or none model have been found instead of one')
else:
    PATH_TO_MODEL = PATH_TO_MODEL[0]

print(f'{PATH_TO_MODEL=}')
PATH_TO_SAVE_IMAGES = PATH_TO_MODEL.parent/'images'/f'id=v{MODEL_VER:04}n{MODEL_NUM:04}_in({DECODED_SIZE})_hid({ENCODED_SIZE})'
print(f'{PATH_TO_SAVE_IMAGES=}')

### Загрузка модели

In [None]:
# Загрузка модели
model = keras.models.load_model(PATH_TO_MODEL)

if not os.path.exists(PATH_TO_SAVE_IMAGES):
    os.makedirs(PATH_TO_SAVE_IMAGES)

print(model.summary())
tf.keras.utils.plot_model(
    model,
    to_file=PATH_TO_SAVE_IMAGES/'model.jpg',
    show_shapes=True,
    show_dtype=False,
    show_layer_names=True,
    rankdir="TB",
    expand_nested=False,
    dpi=200,
    show_layer_activations=True,
    show_trainable=True,
)

# Загрузка данных для тестирования

In [None]:
df = dw.get_data_df('data/original_data')

# добавления данных перемножения времен и амплитуд
df = dw.cast_df_to_2d(df)
for i, ((_, time), (_, amp)) in enumerate(zip(df['Time'].items(), df['Amplitude'].items())):
    df['Time_x_Amplitude', i] = time * amp

df['BinDefect', 0] = df['DefectDepth', 0].map(lambda x: x>0) # бинарное значение - есть дефект или нет. True - есть дефект

# скалирование
dfs_list = []
for name, part in df.groupby(level=0, axis=1):
    temp_arr = dw.standardize_data(part.to_numpy())
    dfs_list.append(pd.DataFrame(data=temp_arr, index=part.index, columns=part.columns))
df = pd.concat(dfs_list, axis=1)
display(df)

# Анализ качетсва кодирования и декодирования

## Кодирование и декодирование данных

In [None]:
# запись результатов декодирования
arr =  model.predict(df['Time_x_Amplitude'].to_numpy(), verbose=0)
temp_df = pd.DataFrame(data=arr, index=df.index, columns=pd.MultiIndex.from_product([['decoded_Time_x_Amplitude'], np.arange(DECODED_SIZE)], names=df.columns.names))
df = pd.concat([df, temp_df], axis=1)

# запись результатов кодирования
model = keras.Model(inputs=model.input, outputs=min([layer.output for layer in model.layers], key=lambda x: x.shape[1]))
arr =  model.predict(df['Time_x_Amplitude'].to_numpy(), verbose=0)
temp_df = pd.DataFrame(data=arr, index=df.index, columns=pd.MultiIndex.from_product([['encoded_Time_x_Amplitude'], np.arange(ENCODED_SIZE)], names=df.columns.names))
df = pd.concat([df, temp_df], axis=1)

# запись значения mean squarred error для каждого наблюдения
or_arr = df['Time_x_Amplitude'].to_numpy()
pred_arr = df['decoded_Time_x_Amplitude'].to_numpy()
mse_list = []
for orig, pred in zip(or_arr, pred_arr):
    mse_list.append(float(keras.losses.MeanSquaredError()(orig, pred)))
df['mse',0] = mse_list
display(df)

## Вывести примеры кодирования и декодирования графиков

In [None]:
# example of autoencode nondef graphs

COUNT = 5 # графиков одного типа (дефектного или недефектного)

for i in range(COUNT):
    for binn in [False, True]:
        path_to_save = f'{PATH_TO_SAVE_IMAGES}/{"defect" if binn else "non_defect"}'
        orig = df[df['BinDefect',0]==binn]['Time_x_Amplitude'].iloc[i]
        decoded = df[df['BinDefect',0]==binn]['decoded_Time_x_Amplitude'].iloc[i]
        
        draw_plot(data=[{'data': [orig], 'label':'Оригинальные данные', 'marker':'o', 'lw':3, 'ms':10, 'mfc':'black'}, 
                        {'data': [decoded], 'label':'Декодированные данные', 'marker':'o', 'lw':3, 'ms':10, 'mfc':'black'}], 
                        title=f"Качество декодирования из {ENCODED_SIZE} в {DECODED_SIZE} для {'дефектной' if binn else 'не дефектной'} области. Строка: {orig.name}",
                        y_label='Время * Амплитуда', x_label='Номер точки', path_to_save=PATH_TO_SAVE_IMAGES/f'plot_(run={orig.name[0]},scan={orig.name[1]},detector={orig.name[2]},defect={binn}).jpg')

## Вывести распределение размеров функции ошибки для наблюдений

In [None]:
sns.histplot(df['mse'].reset_index().rename(columns={0:'Mse'}), x='Mse', hue='File', bins=20, alpha=0.5)
plt.savefig(PATH_TO_SAVE_IMAGES/'loss_hist.jpg', bbox_inches='tight')
plt.close()

In [None]:
df['encoded_Time_x_Amplitude'].hist()
plt.subplots_adjust(left=0.1, bottom=0.1, right=0.9, top=0.96, wspace=0.2, hspace=0.25)
plt.tight_layout()
plt.show()
plt.savefig(PATH_TO_SAVE_IMAGES/'hidden_state_distributions.jpg', bbox_inches='tight')
plt.close()