In [1]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = ''

In [2]:
from joblib import Parallel, delayed

from matplotlib import pyplot as plt
from os import listdir
import pandas as pd

import tensorflow as tf
import numpy as np

from fsmodels import SklSingleExpFrequencyScan

from fsplots import plot_model
from fsplots import plot_loss_path
from fsplots import plot_experimental_points

%matplotlib inline

In [3]:
def get_file_names(datasets_folder):
    return [datasets_folder + '/' + _ for _ in listdir(datasets_folder)]

In [4]:
def read_datasets(file_names):
    return [[f_name, pd.read_csv(f_name,
                                 header=0,
                                 parse_dates=[0],
                                 infer_datetime_format=True)]
            for f_name in file_names]

In [5]:
def fit_model(df):
    
    freq_values = df.frequency_hz.to_numpy()
    freq_powers = np.log10(freq_values)
    
    dlts_values = df.dlts_pf.to_numpy()
    f_pulse_value = float(df.f_pulse[0]) * 10 ** (-6)
    
    
    max_abs_index = np.absolute(dlts_values).argmax()
    extrem_val = dlts_values[max_abs_index]
    
    new_max_dlts = freq_powers.max()

    normalize = lambda x: x / extrem_val * new_max_dlts
    denormalize = lambda x: x * extrem_val / new_max_dlts
    
    dlts_values = normalize(dlts_values)

    model = SklSingleExpFrequencyScan(filling_pulse = f_pulse_value,
                                      fit_p_coef = True,
                                      learning_rate = 0.05,
                                      n_iters = 1000, 
                                      stop_val = 10**-10,
                                      verbose = False
                                     )
    
    model.fit(X=freq_powers, y=dlts_values)
    
    fit_results_ = model.fit_results_
    fit_results_.amplitude_0 = denormalize(fit_results_.amplitude_0)
    
    return fit_results_

In [6]:
def get_additional_text(df, fit_results_):

    def text_params(fit_result, actual_dlts, frequency_powers):
        time_constant_power = fit_result.time_constant_pow_0
        f_pulse = fit_result.filling_pulse
        p = fit_result.p_coef
        amp = fit_result.amplitude_0
        mse = fit_result.loss

        text = '\n'.join(['$\\log_{10}(\\tau)$ = ' + f'{time_constant_power:.4f} ' + '$\\log_{10}$(с)',
                          f'$\\tau$ = {10**time_constant_power:.4e} с',
                          f'$A$ = {amp:.4e} пФ',
                          f'$p$ = {p:.4f}',
                          f'MSE = {mse:.4e} $пФ^2$',
                          f'RMSE = {np.sqrt(mse):.4e} пФ'
                         ])

        return text

    
    frequency_powers = np.log10(df.frequency_hz.to_numpy())
    dlts_values = df.dlts_pf.to_numpy()
    f_pulse = df.f_pulse[0] * 10 ** (-6)
    
    text_1 = '\n'.join([f'Образец: {df.specimen_name[0]}',
                        f'$T$ = {df.temperature_k.mean():.1f} К',
                        f'$U_1$={df.u1[0]} В',
                        f'$U_R$={df.ur[0]} В',
                        f'$t_1$ = {f_pulse:.4e} с'
                       ])
    
    text_2 = '\n'.join(['Начальные значения:', text_params(fit_results_.iloc[0, :], dlts_values, frequency_powers)])
    text_3 = '\n'.join(['Конечные значения:', text_params(fit_results_.iloc[-1, :], dlts_values, frequency_powers)])
    
    return text_1, text_2, text_3


def print_results(df, fit_results_):
    
    frequency = df.frequency_hz.to_numpy()
    frequency_powers = np.log10(frequency)
    actual_dlts = df.dlts_pf.to_numpy()
    
    fig, (ax0, ax1) = plt.subplots(1, 2, figsize=(15, 5))

    ax0 = plot_model(X = frequency_powers, 
                     y = actual_dlts, 
                     model_class = SklSingleExpFrequencyScan,
                     fit_results_ = fit_results_, 
                     plot_exps = False,
                     ax=ax0
                    )

    ax1 = plot_loss_path(fit_results_, ax=ax1)
    
    title = f'{df.specimen_name[0]} T={df.temperature_k.mean():.1f} K, $U_1$={df.u1[0]} V, $U_R$={df.ur[0]} V'
    plt.suptitle(title, y=1.01)
    
    text_1, text_2, text_3 = get_additional_text(df, fit_results_)
    text = '\n\n'.join([text_1, text_3])
    
    
    x = 0.4 * (max(ax1.get_xlim()) - min(ax1.get_xlim())) + min(ax1.get_xlim())
    y = 0.95 * max(ax1.get_ylim())
    fontsize=10
    bbox_dict = {'facecolor':'white', 'alpha':0.8, 'edgecolor':'gray'}
        
    ax1.text(x, y, text, fontsize=fontsize, verticalalignment='top', bbox=bbox_dict)
    
    return fig, (ax0, ax1)

In [7]:
DATASET_PATH = '../dataset'
PLOTS_PATH = '../plots'
MODELS_PATH = '../models'

In [8]:
fnames = get_file_names(DATASET_PATH)
df_list = read_datasets(fnames)

In [9]:
def batch_processing(f_name, df):
    
    fit_results_ = fit_model(df)
    
    fig, ax = print_results(df=df,
                            fit_results_=fit_results_)

    frequency_powers = np.log10(df.frequency_hz.to_numpy())
    
    final_model = SklSingleExpFrequencyScan(filling_pulse=fit_results_.filling_pulse.iloc[-1])
    
    last_row = fit_results_.iloc[-1]
    
    final_model.exps_params_ = [ [ last_row.time_constant_pow_0, last_row.amplitude_0 ] ]
    
    final_model.p_coef_ = last_row.p_coef
    
    model_df = df.copy()
    
    model_df['dlts_pf_model'] = final_model.predict(frequency_powers)
    model_df['p_coef_model'] = final_model.p_coef_
    
    if model_df['p_coef_model'].isna().any():
        message = MODELS_PATH + '/' + f_name.split('/')[-1].rstrip('.csv') + '_model - ERROR'
    else:
        message = MODELS_PATH + '/' + f_name.split('/')[-1].rstrip('.csv') + '_model - OK'
    
    model_df['time_constant_power_model'] = final_model.exps_params_[0, 0]
    model_df['time_constant_model'] = 10 ** final_model.exps_params_[0, 0]
    model_df['amplitude_model'] = final_model.exps_params_[0, 1]
    
    mse = np.square(df.dlts_pf.to_numpy() - final_model.predict(frequency_powers)).mean()
    model_df['rmse_model'] = np.sqrt(mse)
    
    file_name = MODELS_PATH + '/' + f_name.split('/')[-1].rstrip('.csv') + '_model' + '.csv'
    model_df.to_csv(file_name, index=False)
    
    file_name = PLOTS_PATH + '/' + f_name.split('/')[-1].rstrip('.csv') + '_model' + '.pdf'
    plt.savefig(file_name, bbox_inches='tight')
    
    file_name = PLOTS_PATH + '/' + f_name.split('/')[-1].rstrip('.csv') + '_model' + '.jpg'
    plt.savefig(file_name, bbox_inches='tight')
    
    plt.close('all')
    
    return message
    
    
messages = Parallel(n_jobs=-1)(delayed(batch_processing)(f_name, df) for f_name, df in df_list)
messages

['../models/1564ЛЕ1№1_п1_2500Гц-1Гц_1пФ_+10С_-0В-1В_5мВ_10мкс_шаг_0,01_model - OK',
 '../models/1564ЛЕ1№1_п1_2500Гц-1Гц_1пФ_+10С_-1В-2В_200мВ_10мкс_шаг_0,01_model - OK',
 '../models/1564ЛЕ1№1_п1_2500Гц-1Гц_1пФ_+10С_-4В-5В_50мВ_10мкс_шаг_0,01_model - OK']