# **Создание датасета**
P.s. Только при первом запуске

In [None]:
run_gnuplot = False

In [None]:
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


In [None]:
!ls /content/drive/MyDrive/Chemistry-main

'1_AlexNet_optim=Adam_bs=16_params=D_tp_tb_size=64.pth'     gnuplot	        __pycache__
'1_AlexNet_optim=Adam_bs=16_params=V_D_tp_tb_size=64.pth'   LICENSE	        README.md
 Data							    mean_disp.json      samples_celeba_attn
 Data299						    model_64.py         spectral.py
 draw_phases.py						    model.py	        Test
 file_to_phase.py					    models	        test_img
 filter.py						    norm_denorm.py      utils.py
 fit_models.ipynb					    predict_params.py   ZipData


In [None]:
if run_gnuplot:
    !sudo apt-get -y install gnuplot

In [None]:
#!ls /usr/bin/*

In [None]:
if run_gnuplot:
    %cd /content/drive/MyDrive/Chemistry-main/Data/
    !pwd

In [None]:
if run_gnuplot:
    !bash /content/drive/MyDrive/Chemistry-main/Data/really_all_region.sh


In [None]:
if run_gnuplot:
    import os
    for file_name in os.listdir('/content/drive/MyDrive/Chemistry-main/Data/'):
        # some filter
        if 'result' not in file_name or 'contour' not in file_name:
            continue
        print('rename', file_name)
        new_file_name = file_name.replace('.csv', '_')
        os.rename(file_name, new_file_name)

# **Инициализация**

In [None]:
%cd /content/drive/MyDrive/Chemistry-main/
!pwd

/content/drive/MyDrive/Chemistry-main
/content/drive/MyDrive/Chemistry-main


In [None]:
import math

import matplotlib.pyplot as plt
import torch

from utils import seed, get_train_dataloader, get_test_dataloader, model_to_log, split_x_train, load_data_phase, read_phase_file
from model import get_model, load_model
from norm_denorm import calculate_mean_disp, norm, denorm

%load_ext autoreload
%autoreload 2

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

seed(2)

In [None]:
# hyperparameters
lr = 1e-4
epochs = 200
batch_size = 31
step_size = 20
gamma = 0.99
# parameters to be predicted
params = ['D', 'tp', 'tb']  # 'V', 'D', 'tp', 'tb'

# path to folder and training set
train_folder = '/content/drive/MyDrive/Chemistry-main/Data'
test_folder = '/content/drive/MyDrive/Chemistry-main/Test'
# file name to save mean and variance for normalization
mean_disp_file = 'mean_disp.json'

# phase pattern size
phase_size = (64, 64)

config = {
    "lr": lr,
    "epochs": epochs,
    "bs": batch_size,
    "sched": 'StepLR',
    "sched_ss": step_size,
    "sched_g": gamma,
    "params": '_'.join(params),
    "optim": 'Adam',
    "size": phase_size[0],
}
important_keys = ['optim', 'bs', 'params']
config['exp_name'] = 'InceptionResnetV1_' + '_'.join(map(lambda key: f'{key}={config[key]}', important_keys))
model_file_name = config['exp_name']

In [None]:
from norm_denorm import dump_mean_disp

mean, dispersion = calculate_mean_disp(train_folder, params)
print(mean, dispersion)

# maintaining the average and variance of usage with an already trained network
# dump_mean_disp(mean_disp_file, mean, dispersion)

[0.18252184 0.52264665 0.80569206] [0.00456303 0.00492078 0.00120417]


In [None]:
x_train, y_train = load_data_phase(train_folder, params, phase_size)
y_train = norm(y_train, mean, dispersion)

In [None]:
x_train, y_train, x_val, y_val = map(lambda x: torch.tensor(x, dtype=torch.float, device=device), split_x_train(x_train, y_train, 31))

In [None]:
val_dataloader = get_test_dataloader(x_val, y_val, batch_size)

In [None]:
train_dataloader = get_train_dataloader(x_train, y_train, batch_size)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import numpy as np
from torchvision.utils import save_image
import os

In [None]:
# импорт моделей, оставить что-то одно
from model_InceptionResnetV1 import Autoencoder # параметр device, по умалчанию None
from model_SAA import Autoencoder # параметр attn=True/False, включает или выключает слои самовнимания, по умалчанию True
from model_resnet import Autoencoder # параметр net, по умалчанию net="resnet50". Возможные варианты: 'resnet18', 'resnet34', 'resnet50', 'resnet101', 'resnet152'

# **Обучение**

Переменная load_prev_model говорит о том, нужно ли продолжать обучение со старыми весами, или начать с начала.

В training_model выбираем что обучаем, автоэнкодер(AE) или модель предсказывающаю параметры по автоэнкодеру(param_AE).

In [None]:
load_prev_model = False
training_model = "AE" # ["AE", "param_AE", "param"]

In [None]:
# Для обучение предсказания параметров по обученному автоэнкодеру
def train_epoch_param(model, encoder, loss, optimizer, dataloader, do_train):
    running_loss = 0.

    # Iterate over data.
    for inputs, targets in dataloader:
        inputs = inputs.to(device)
        targets = targets.to(device)

        optimizer.zero_grad()

        # forward and backward
        with torch.set_grad_enabled(do_train):
            # 4
            # preds, _, _ = encoder(inputs)
            # 3
            # preds = encoder(inputs)
            #
            _, z = encoder(inputs)
            # print(preds.size())
            preds = model(z)
            loss_value = loss(preds, targets)

            # backward + optimize only if in training phase
            if do_train:
                loss_value.backward()
                optimizer.step()

        # statistics
        running_loss += loss_value.item()

    epoch_loss = running_loss / len(dataloader)

    return epoch_loss


def train_model_param(model, encoder, loss, optimizer, scheduler, train_dataloader, val_dataloader, num_epochs, config, start_epoch=-1, prev_loss=None):
    if prev_loss:
      best_val_loss = prev_loss
    else:
      best_val_loss = math.inf
    save__original = True
    for epoch in range(start_epoch+1, num_epochs):

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                dataloader = train_dataloader
                model.train()  # Set model to training mode
            else:
                dataloader = val_dataloader
                model.eval()  # Set model to evaluate mode

            epoch_loss = train_epoch_param(model, encoder, loss, optimizer, dataloader, phase == 'train')

            if phase == 'train':
                scheduler.step()
            print('loss', epoch_loss, 'epoch', epoch)

            if phase == 'val' and best_val_loss > epoch_loss:
                print('New best val loss', epoch_loss, 'epoch', epoch)
                best_val_loss = epoch_loss
                torch.save({
                    'epoch': epoch,
                    'model_state_dict': model.state_dict(),
                    'optimizer_state_dict': optimizer.state_dict(),
                    'loss': epoch_loss,
                    'model_name': config['exp_name']
                }, f'./models/param_{config["exp_name"]}.pth')

    return model

In [None]:
# Для обучение Автоэнкодера
def train_epoch(model, loss, optimizer, dataloader, do_train):
    running_loss = 0.

    # Iterate over data.
    for inputs, targets in dataloader:
        inputs = inputs.to(device)
        targets = targets.to(device)

        optimizer.zero_grad()

        # forward and backward
        with torch.set_grad_enabled(do_train):
            preds, _ = model(inputs)
            loss_value = loss(preds, inputs)

            # backward + optimize only if in training phase
            if do_train:
                loss_value.backward()
                optimizer.step()

        # statistics
        running_loss += loss_value.item()

    epoch_loss = running_loss / len(dataloader)

    return epoch_loss


def train_model(model, loss, optimizer, scheduler, train_dataloader, val_dataloader, num_epochs, config, start_epoch=-1, prev_loss=None):
    if prev_loss:
      best_val_loss = prev_loss
    else:
      best_val_loss = math.inf
    save__original = True
    for epoch in range(start_epoch+1, num_epochs):

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                dataloader = train_dataloader
                model.train()  # Set model to training mode
            else:
                dataloader = val_dataloader
                model.eval()  # Set model to evaluate mode

            epoch_loss = train_epoch(model, loss, optimizer, dataloader, phase == 'train')

            if phase == 'train':
                scheduler.step()
            print('loss', epoch_loss, 'epoch', epoch)

            if phase == 'val' and best_val_loss > epoch_loss:
                print('New best val loss', epoch_loss, 'epoch', epoch)
                best_val_loss = epoch_loss
                torch.save({
                    'epoch': epoch,
                    'model_state_dict': model.state_dict(),
                    'optimizer_state_dict': optimizer.state_dict(),
                    'loss': epoch_loss,
                    'model_name': config['exp_name']
                }, f'./models/{config["exp_name"]}.pth')

    return model

In [None]:
if training_model == "param_AE":
    # загрузка весов автоэнкодера
    autoencoder_state_dict = torch.load(f'./models/{config["exp_name"]}.pth')['model_state_dict']
    AE = Autoencoder()
    AE.load_state_dict(autoencoder_state_dict)
    AE.eval()

    del autoencoder_state_dict

In [None]:
if training_model == "param_AE":
    model = nn.Sequential(
            nn.Flatten(),
            nn.Linear(1792, 512),
            # nn.Dropout(p=0.01),
            nn.ReLU(),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Linear(128, 32),
            nn.ReLU(),
            nn.Linear(32, len(params))
        )

    # loss_func = torch.nn.L1Loss()
    loss_func = torch.nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma)

In [None]:
if training_model == "param_AE":
    save = torch.load(f'./models/param_{config["exp_name"]}.pth')
    if load_prev_model:
      start_epoch = save['epoch']
      start_loss = save['loss']
    else:
      start_epoch = -1
      start_loss = None

    print(start_epoch, start_loss)

    if load_prev_model:
    model.load_state_dict(save['model_state_dict'])
    optimizer.load_state_dict(save['optimizer_state_dict'])
    model.train()

In [None]:
if training_model == "param_AE":
    model = train_model_param(model, AE, loss_func, optimizer, scheduler, train_dataloader, val_dataloader, epochs, config, start_epoch, start_loss)

In [None]:
if training_model == "AE":
    # не забываем указать параметры, в зависимости от архитектуры
    model = Autoencoder()

    # loss_func = torch.nn.L1Loss()
    loss_func = torch.nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma)

In [None]:
if training_model == "AE":

In [None]:
if training_model == "AE":
    if load_prev_model:
      save = torch.load(f'./models/{config["exp_name"]}.pth')
      model.load_state_dict(save['model_state_dict'])
      optimizer.load_state_dict(save['optimizer_state_dict'])
      model.train()
      start_epoch = save['epoch']
      prev_loss = save['loss']
      print('loss ', prev_loss)
      print('start_epoch ', start_epoch)
      del save

In [None]:
if training_model == "AE":
    if load_prev_model:
      model = train_model(model, loss_func, optimizer, scheduler, train_dataloader, val_dataloader, epochs, config, start_epoch, prev_loss)
    else:
      model = train_model(model, loss_func, optimizer, scheduler,train_dataloader, val_dataloader, epochs, config)

# **Тестирование и метрики**

In [None]:
x_test, y_test = load_data_phase(test_folder, params, phase_size)
y_test = norm(y_test, mean, dispersion)
x_test, y_test = map(lambda x: torch.tensor(x, dtype=torch.float, device=device), (x_test, y_test))
test_dataloader = get_train_dataloader(x_test, y_test, 31)

In [None]:
autoencoder_state_dict = torch.load(f'./models/{config["exp_name"]}.pth')['model_state_dict']
AE = Autoencoder()
AE.load_state_dict(autoencoder_state_dict)
AE.eval()

del autoencoder_state_dict

In [None]:
best_model = nn.Sequential(
        nn.Flatten(),
        nn.Linear(1792, 512),
        # nn.Dropout(p=0.01),
        nn.ReLU(),
        nn.Linear(512, 128),
        nn.ReLU(),
        nn.Linear(128, 32),
        nn.ReLU(),
        nn.Linear(32, len(params))
    )
best_model.load_state_dict(torch.load(f'./models/{config["exp_name"]}.pth')['model_state_dict'])
best_model.eval()

In [None]:
# сохранение оригинальной и созданой AE фазовой диаграммы
for inputs, targets in test_dataloader:
  save_image(inputs, os.path.join('./test_img', f'original_AE_IRN1.png'))
  save_image(AE(inputs)[0], os.path.join('./test_img', 'fake_AE_IRN1.png'))

In [None]:
FP, FN, TP, TN = 0, 0, 0, 0
for inputs, targets in test_dataloader:
    fake, z = AE(inputs)
    for i in range(len(inputs)):
        for j in range(len(inputs[i])):
            for k1 in range(len(inputs[i][j])):
                for k2 in range(len(inputs[i][j][k1])):
                    predicted = round(float(fake[i][j][k1][k2]))
                    actual = inputs[i][j][k1][k2]
                    if predicted == 1 and actual == 1:
                        TP += 1
                    elif predicted == 0 and actual == 0:
                        TN += 1
                    elif actual == 1 and predicted == 0:
                        FN += 1
                    elif actual == 0 and predicted == 1:
                        FP += 1
    # print(FP, FN, TP, TN)
print("f1 =", 2*TP / (2*TP + FP + FN))
print("IoU =", TP / (TP + FP + FN))

In [None]:
y_train_result = best_model.forward(AE(x_train)[1]).detach()
y_val_result = best_model.forward(AE(x_val)[1]).detach()
y_test_result = best_model.forward(AE(x_test)[1]).detach()

In [None]:
y_train_result_np = np.array(y_train_result)
y_val_result_np = y_val_result.cpu().numpy()
y_test_result_np = y_test_result.detach().cpu().numpy()

y_train_np = y_train.detach().cpu().numpy()
y_val_np = y_val.detach().cpu().numpy()
y_test_np = y_test.detach().cpu().numpy()

In [None]:
def add_subplot(fig, train, result, param_index, plot_index, label):

    sort = train[:, param_index].argsort(axis=0).reshape((-1,))
    ax_train = fig.add_subplot(3, 1, plot_index)
    ax_train.set_title(label=label, fontsize=12)
    ax_train.set_ylabel(param)
    ax_train.set_xlabel('num File')
    ax_train.plot(denorm(train[sort], mean, dispersion)[:, param_index], 'go', label='train')
    ax_train.plot(denorm(result[sort], mean, dispersion)[:, param_index], 'ro', label='result')
    ax_train.legend()

def show(param_index, param):

    fig = plt.figure(figsize=(12, 12))
    fig.suptitle(f'Параметр {param}', fontsize=16)

    fig.subplots_adjust(hspace=0.4)

    add_subplot(fig, y_train_np, y_train_result_np, param_index, 1, 'Обучающий набор')
    add_subplot(fig, y_val_np, y_val_result_np, param_index, 1, 'Валидационный набор')
    add_subplot(fig, y_test_np, y_test_result_np, param_index, 2, 'Тестовый набор')

    plt.show()

In [None]:
for param_index, param in enumerate(params):
    show(param_index, param)

In [None]:
from sklearn.metrics import r2_score

def get_loss(result, real, i):
    error_m = torch.nn.MSELoss()(real[:, i], result[:, i]).item()
    error_a = torch.nn.L1Loss()(real[:, i], result[:, i]).item()
    r2 = r2_score(real[:, i], result[:, i])
    return round(error_m, 6), round(error_a, 6), round(r2, 6)

def print_errors(i):
    train_error_m, train_error_a, r2 = get_loss(y_train_result, y_train, i)
    print('train error L1', train_error_a, 'MSE', train_error_m, 'r2', r2)
    val_error_m, val_error_a, r2 = get_loss(y_val_result, y_val, i)
    print('val error L1', val_error_a, 'MSE', val_error_m, 'r2', r2)
    test_error_m, test_error_a, r2 = get_loss(y_test_result, y_test, i)
    print('test error L1', test_error_a, 'MSE', test_error_m, 'r2', r2)

In [None]:
for param_index, param in enumerate(params):
    print(param, 'errors')
    print_errors(param_index)

In [None]:
r2_score(y_test, y_test_result)

In [None]:
r2_score(y_val, y_val_result)