In [4]:
!pip install tensorflow_decision_forests
# !pip install numpy
# !pip install pandas
# !pip install keras
# !pip install scikit-learn
# !pip install opencv-python



In [5]:
from keras.callbacks import ModelCheckpoint
from keras.models import Sequential, Model
from keras.callbacks import ModelCheckpoint
from keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense
from keras.layers import BatchNormalization
from keras.layers import Concatenate
from keras.layers import GlobalAveragePooling2D
from keras.layers import Conv2DTranspose
from keras.layers import Resizing

import tensorflow as tf
import keras
import numpy as np

import tensorflow_decision_forests as tfdf

In [6]:
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
from sklearn.impute import KNNImputer

def pca_features(data: np.ndarray, n_components: int = 10) -> np.ndarray:
    flattened_data = np.array([img.flatten() for img in data])
    data_processed = PCA(n_components=n_components).fit_transform(flattened_data)
    return data_processed


def t_sne_features(data: np.ndarray, n_components: int = 10):
    data_embeded = TSNE(n_components=n_components,
                        learning_rate='auto',
                        init='random',
                        method='exact',
                        perplexity=3).fit_transform(data)
    print(data_embeded.shape)
    return data_embeded

In [7]:
import os
import cv2 as cv
import pandas as pd

def load_images_from_folder(folder: str) -> np.ndarray:
    """
    Функция подгрузки необходимого набора искусственных изображений из передаваемого каталога.

    :param folder: папка с изображениями, сохраненными в формате .png

    :return: список формата Numpy, содержащие AIO в объектах класса Image из Pillow
    """

    images = []
    for filename in os.listdir(folder):
        img = cv.imread(os.path.join(folder, filename), cv.IMREAD_GRAYSCALE)
        if img is not None:
            images.append(np.asarray(img).astype(np.float32))
    return np.asarray(images)

folder_images = "/content"
images = load_images_from_folder(folder_images)
pca_features_ = pca_features(images, n_components=30)
df_wheat = pd.read_csv("/content/wheat_pheno_num_sync.csv")
labels = df_wheat[["Урожайность.зерна..г.", "Высота.растений..см"]].to_numpy()

# импутирование данных
# (Пока просто средними значениями) импутируем данные, поскольку присутствуют пропуски
# imp = SimpleImputer(missing_values=np.nan, strategy='mean')
imp = KNNImputer(n_neighbors=2, weights='uniform')
labels = imp.fit_transform(labels.reshape(-1, 2))

In [None]:
from dataclasses import dataclass

# Модель, скомбинированная со случайным лесом
@dataclass
class ComboModel:

    n_epochs: int = 100
    n_row: int = 200
    n_col: int = 200
    input_channels: int = 1
    n_data: int = 100  # len(aio_labels)
    random_seed: int = 1234567890
    n_dict_features: int = 30
    n_trait: int = 1
    data_train: np.ndarray = np.ndarray([])
    label_train: np.ndarray = np.ndarray([])

    optimizer: keras.optimizers.Optimizer = None
    model: keras.models.Model = None

    def combo_model_functional(self, hp):
        """
        Функция построения модели нейросети с функциональным интерфейсом keras

        :param hp: набор гиперпараметров, отвечающих за конфигурация нейросети
        :return: граф-представление нейросети
        """

        inp_node = Input((self.n_row, self.n_col, self.input_channels), name="img_input")
        inp_dict_model = Input(self.n_dict_features, name="pop_struct_input")

        conv_node_1 = Conv2D(hp['first_conv2d_out_channels'],
                             kernel_size=(hp['first_conv2d_kernel_size'], hp['first_conv2d_kernel_size']),
                             padding='same',
                             strides=(1, 1),
                             activation=hp['first_conv2d_activation'], name="conv_map_1")(inp_node)
        if hp['need_extra_conv2d']:
            conv_node_1 = Conv2D(hp['extra_conv2d_out_channels'],
                                 kernel_size=(hp['extra_conv2d_kernel_size'], hp['extra_conv2d_kernel_size']),
                                 padding='same',
                                 strides=(1, 1),
                                 activation=hp['extra_conv2d_activation'], name="conv_map_extra")(conv_node_1)

        if hp['need_batch_norm_after_first_conv2d']:
            batch_node_1 = BatchNormalization()(conv_node_1)
            mp_node_1 = MaxPooling2D(pool_size=(2, 2))(batch_node_1)
        else:
            mp_node_1 = MaxPooling2D(pool_size=(2, 2))(conv_node_1)

        conv_node_2 = Conv2D(hp['second_conv2d_out_channels'],
                             kernel_size=(hp['second_conv2d_kernel_size'], hp['second_conv2d_kernel_size']),
                             padding='same',
                             strides=(1, 1),
                             activation=hp['second_conv2d_activation'], name="conv_map_2")(mp_node_1)

        if hp['need_batch_norm_after_second_conv2d']:
            batch_node_2 = BatchNormalization()(conv_node_2)
            mp_node_2 = MaxPooling2D(pool_size=(2, 2), name="max_pool_map")(batch_node_2)
        else:
            mp_node_2 = MaxPooling2D(pool_size=(2, 2), name="max_pool_map")(conv_node_2)

        if hp['need_deconv_block']:
            deconv_node_2 = Conv2DTranspose(
                hp['second_conv2d_out_channels'],
                kernel_size=(hp['second_conv2d_kernel_size'], hp['second_conv2d_kernel_size']),
                padding='same',
                strides=(2, 2),
                activation=hp['second_conv2d_activation'],
                name="deconv_2"
            )(mp_node_2)
            concat_node_2 = Concatenate(name="concat_2", axis=3)([deconv_node_2, conv_node_2])
            conv_node_deconv_2 = Conv2D(
                hp['second_conv2d_out_channels'],
                kernel_size=(hp['second_conv2d_kernel_size'], hp['second_conv2d_kernel_size']),
                padding='same',
                strides=(1, 1),
                activation=hp['second_conv2d_activation'],
                name="conv_deconv_2"
            )(concat_node_2)
            deconv_node_1 = Conv2DTranspose(
                hp['first_conv2d_out_channels'],
                kernel_size=(hp['first_conv2d_kernel_size'], hp['first_conv2d_kernel_size']),
                padding='same',
                strides=(2, 2),
                activation=hp['first_conv2d_activation'],
                name="deconv_1"
            )(conv_node_deconv_2)
            concat_node_1 = Concatenate(name="concat_1", axis=3)([deconv_node_1, conv_node_1])
            mp_node_2 = Conv2D(
                hp['first_conv2d_out_channels'],
                kernel_size=(hp['first_conv2d_kernel_size'], hp['first_conv2d_kernel_size']),
                padding='same',
                strides=(1, 1),
                activation=hp['first_conv2d_activation'],
                name="conv_deconv_1"
            )(concat_node_1)

        if hp['use_gap_1_or_flatten_0'] == 0:
            flatten_node = Flatten(name='flatten')(mp_node_2)
            dense_node = Dense(hp['num_feature_output'], activation=hp['dense_output_activation'],
                               name="img_feature_output")(flatten_node)
        elif hp['use_gap_1_or_flatten_0'] == 1:
            dense_node = GlobalAveragePooling2D(name="img_feature_output")(mp_node_2)

        concatenate_features = Concatenate(name="concat_features")(inp_dict_model, dense_node)

        reg_forest_1 = tfdf.keras.RandomForestModel(task=tfdf.keras.Task.REGRESSION,
                                                    num_trees=hp['num_estimators'],
                                                    max_depth=hp['max_depth'],
                                                    bootstrap_training_dataset=hp['bootstrap'])
        forest_1_pred = reg_forest_1(concatenate_features)

        reg_forest_2 = tfdf.keras.RandomForestModel(task=tfdf.keras.Task.REGRESSION,
                                                    num_trees=hp['num_estimators'],
                                                    max_depth=hp['max_depth'],
                                                    bootstrap_training_dataset=hp['bootstrap'])
        forest_2_pred = reg_forest_2(concatenate_features)

        combo_model = Model(inputs=[inp_node, inp_dict_model], outputs=[forest_1_pred, forest_2_pred],
                            name="feature_model")

        self.model = combo_model

    def build(self, hp):
        """
        Builds a convolutional model.
        """
        # Гиперпараметры сверточной части модели
        model_hp = {
            # сначала идут параметры сверточной части модели
            'first_conv2d_out_channels': [32, 64],
            'first_conv2d_kernel_size': [3, 5, 7],
            'first_conv2d_activation': ['tanh', 'relu'],
            'need_extra_conv2d': [False, True],
            'extra_conv2d_out_channels': [32, 64],
            'extra_conv2d_kernel_size': [3, 5, 7],
            'extra_conv2d_activation': ['tanh', 'relu'],
            'need_batch_norm_after_first_conv2d': [True, False],
            'second_conv2d_kernel_size': [3, 5],
            'second_conv2d_out_channels': [128, 64],
            'second_conv2d_activation': ['tanh', 'relu'],
            'need_batch_norm_after_second_conv2d': [True, False],
            'dense_output_activation': ['sigmoid', 'linear'],
            'use_gap_1_or_flatten_0': [1, 0],
            'need_deconv_block': [False, True],
            'num_feature_output': [128, 64, 256],
            # а дальше идут параметры регрессионного случайного леса
            'n_estimators': [5, 20, 50, 100],
            'max_features': ['auto', 'sqrt'],
            'max_depth': [(i + 1) * 5 for i in range(7)],
            'min_samples_split': [2, 6, 10],
            'bootstrap': [True, False]
        }

        # возвращаем собранную модель
        return self.combo_model_functional(model_hp)

    @staticmethod
    @tf.function
    def custom_loss_mae(y_true: np.ndarray, y_pred: np.ndarray) -> np.array:
        error = y_true - y_pred
        abs_error_1, abs_error_2 = tf.abs(error[:, 0]), tf.abs(error[:, 1])
        result_1, result_2 = tf.reduce_mean(abs_error_1), tf.reduce_mean(abs_error_2)
        # return np.array([result_1, result_2])

        return np.sqrt(result_1 * result_2)

    @staticmethod
    @tf.function
    def custom_loss_mse(y_true: np.ndarray, y_pred: np.ndarray) -> np.array:
        error = y_true - y_pred
        squared_error_1, squared_error_2 = tf.square(error), tf.square(error[:, 1])
        result_1, result_2 = tf.reduce_mean(squared_error_1), tf.reduce_mean(squared_error_2)
        # return np.array([result_1, result_2])
        return np.sqrt(result_1 * result_2)

    # Function to run the train step.
    # здесь надо подумать как исправить эту функцию
    @tf.function
    def run_train_step(self, images_, pop_comps_, labels_1_, labels_2_):
        with tf.GradientTape() as tape:
            logits_1, logits_2 = self.model(images_, pop_comps_)
            loss_1 = loss_fn(labels, logits_1)
            loss_2 = loss_fn(labels, logits_2)
            # Add any regularization losses.
            if self.model.losses:
                loss_1 += tf.math.add_n(self.model.losses)
                loss_2 += tf.math.add_n(self.model.losses)
        gradients = tape.gradient(loss_1, self.model.trainable_variables)
        gradients = tape.gradient(loss_2, self.model.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))

    # Function to run the validation step.
    @tf.function
    def run_val_step(self, images_, pop_comps_, labels_1_, labels_2_):
        logits = self.model(images)
        loss = loss_fn(labels, logits)
        # Update the metric.
        epoch_loss_metric.update_state(loss)

    @staticmethod
    def fit_cv(comp_hp: dict, model_hp: dict, model: 'ComboModel', splits_num: int = 10) -> list:
        model_keras = model.build(model_hp)

        model_keras.compile(optimizer=keras.optimizers.SGD(learning_rate=0.0001),
                            loss=ComboModelTuner.custom_loss_mae,
                            loss_weights=1.0,
                            metrics=[ComboModelTuner.custom_loss_mse])

        mae_per_fold_tr, mse_per_fold_tr = [], []
        mae_per_fold_vd, mse_per_fold_vd = [], []

        kfold = KFold(n_splits=splits_num, shuffle=True)
        for j, (tr_idx, val_idx) in enumerate(kfold.split(model.features_train, model.data_train, model.labels_train)):
            model_keras.fit(x=[model.data_train[tr_idx], model.features_train[tr_idx]],
                            y=model.labels_train[tr_idx],
                            batch_size=total_hp["batch_size_ll"],
                            epochs=total_hp["num_epochs_ll"])

            scores = model_keras.evaluate(x=[model.data_train[tr_idx], model.features_train[tr_idx]],
                                          y=model.labels_train[tr_idx])

            mse_per_fold_tr.append(scores[0])
            mae_per_fold_tr.append(scores[1])

            scores = model_keras.evaluate(x=[model.data_train[val_idx], model.features_train[val_idx]],
                                          y=model.labels_train[val_idx])

            mse_per_fold_vd.append(scores[0])
            mae_per_fold_vd.append(scores[1])
            print(f"Fold #{j + 1} finished succesfully")

        return [mae_per_fold_tr, mse_per_fold_tr, mae_per_fold_vd, mse_per_fold_vd]

In [None]:
# обучение модели со случайным лесом

def hyper_tuning(model: 'ComboModel', iters_num: int, hps_cnn: dict, hps_reg: dict):
    valid_mae_label_1, valid_mae_label_2 = [], []
    valid_mse_label_1, valid_mse_label_2 = [], []
    valid_accuracy_label_1, valid_accuracy_label_2 = [], []

    for iter_ in range(iters_num):
        # Сборка комбинации случайных гиперпараметров в заданын границах
        cnn_hp_comb, reg_hp_comb = {}, {}

        for param in hps_cnn:
            if len(hps_cnn[param]) > 1:
                if any(isinstance(x, bool) for x in hps_cnn[param]) or any(isinstance(x, str) for x in hps_cnn[param]):
                    cnn_hp_comb[param] = hps_cnn[param][np.random.randint(len(hps_cnn[param]))]
                else:
                    cnn_hp_comb[param] = np.random.randint(low=min(hps_cnn[param]), high=max(hps_cnn[param]))
            else:
                cnn_hp_comb[param] = hps_cnn[param][0]

        for param in hps_reg:
            if len(hps_reg[param]) > 1:
                if any(isinstance(x, bool) for x in hps_reg[param]) or any(isinstance(x, str) for x in hps_reg[param]):
                    reg_hp_comb[param] = hps_reg[param][np.random.randint(len(hps_reg[param]))]
                else:
                    reg_hp_comb[param] = np.random.randint(low=min(hps_reg[param]), high=max(hps_reg[param]))
            else:
                reg_hp_comb[param] = hps_reg[param][0]

        print(cnn_hp_comb)

        model = SimpleCNNModel(n_epochs=20,
                               n_row=200,
                               n_col=200,
                               input_channels=1,
                               random_seed=1234567890,
                               n_dict_features=30,
                               n_trait=2,
                               data_train=train_images_,
                               labels_train=train_labels_,
                               features_train=train_features_,
                               data_test=test_images_,
                               features_test=test_features_,
                               labels_test=test_labels_)

            # model = SimpleCNNModel()
          metrics = ComboModelTuner.custom_cv(total_hp={"batch_size_ll": 64, "num_epochs_ll": 30},
                                              cnn_hp=cnn_hp_comb,
                                              model=model)

        # считаем ошибку модели на тестовой выборке
        print(f"Random Tuning iter #{iter_} finished successfully")

IndentationError: unexpected indent (<ipython-input-6-fe16b81e7eef>, line 47)

In [8]:
from dataclasses import dataclass

# Модель сугубо нейронной сети
@dataclass
class SimpleCNNModel:
    n_epochs: int = 20
    n_row: int = 200
    n_col: int = 200
    input_channels: int = 1
    random_seed: int = 1234567890
    n_dict_features: int = 30
    n_trait: int = 2

    data_train: np.ndarray = np.ndarray([])
    features_train: np.ndarray = np.asarray([])
    labels_train: np.ndarray = np.ndarray([])

    data_test: np.ndarray = np.asarray([])
    features_test: np.ndarray = np.asarray([])
    labels_test: np.ndarray = np.asarray([])

    def build(self, hp: dict):
        """
        Функция построения модели нейросети с функциональным интерфейсом keras

        :param hp: набор гиперпараметров, отвечающих за конфигурация нейросети
        :return: граф-представление нейросети
        """

        inp_node = Input((self.n_row, self.n_col, self.input_channels), name="img_input")

        inp_node_dict = Input({self.n_dict_features}, name="dict_input")

        conv_node_1 = Conv2D(hp['first_conv2d_out_channels'],
                             kernel_size=(hp['first_conv2d_kernel_size'], hp['first_conv2d_kernel_size']),
                             padding='same',
                             strides=(1, 1),
                             activation=hp['first_conv2d_activation'], name="conv_map_1")(inp_node)
        if hp['need_extra_conv2d']:
            conv_node_1 = Conv2D(hp['extra_conv2d_out_channels'],
                                 kernel_size=(hp['extra_conv2d_kernel_size'], hp['extra_conv2d_kernel_size']),
                                 padding='same',
                                 strides=(1, 1),
                                 activation=hp['extra_conv2d_activation'], name="conv_map_extra")(conv_node_1)

        if hp['need_batch_norm_after_first_conv2d']:
            batch_node_1 = BatchNormalization()(conv_node_1)
            mp_node_1 = MaxPooling2D(pool_size=(2, 2))(batch_node_1)
        else:
            mp_node_1 = MaxPooling2D(pool_size=(2, 2))(conv_node_1)

        conv_node_2 = Conv2D(hp['second_conv2d_out_channels'],
                             kernel_size=(hp['second_conv2d_kernel_size'], hp['second_conv2d_kernel_size']),
                             padding='same',
                             strides=(1, 1),
                             activation=hp['second_conv2d_activation'], name="conv_map_2")(mp_node_1)

        if hp['need_batch_norm_after_second_conv2d']:
            batch_node_2 = BatchNormalization()(conv_node_2)
            mp_node_2 = MaxPooling2D(pool_size=(2, 2), name="max_pool_map")(batch_node_2)
        else:
            mp_node_2 = MaxPooling2D(pool_size=(2, 2), name="max_pool_map")(conv_node_2)

        if hp['need_deconv_block']:
            deconv_node_2 = Conv2DTranspose(
                hp['second_conv2d_out_channels'],
                kernel_size=(hp['second_conv2d_kernel_size'], hp['second_conv2d_kernel_size']),
                padding='same',
                strides=(2, 2),
                activation=hp['second_conv2d_activation'],
                name="deconv_2"
            )(mp_node_2)
            concat_node_2 = Concatenate(name="concat_2", axis=3)([deconv_node_2, conv_node_2])
            conv_node_deconv_2 = Conv2D(
                hp['second_conv2d_out_channels'],
                kernel_size=(hp['second_conv2d_kernel_size'], hp['second_conv2d_kernel_size']),
                padding='same',
                strides=(1, 1),
                activation=hp['second_conv2d_activation'],
                name="conv_deconv_2"
            )(concat_node_2)
            deconv_node_1 = Conv2DTranspose(
                hp['first_conv2d_out_channels'],
                kernel_size=(hp['first_conv2d_kernel_size'], hp['first_conv2d_kernel_size']),
                padding='same',
                strides=(2, 2),
                activation=hp['first_conv2d_activation'],
                name="deconv_1"
            )(conv_node_deconv_2)
            concat_node_1 = Concatenate(name="concat_1", axis=3)([deconv_node_1, conv_node_1])
            mp_node_2 = Conv2D(
                hp['first_conv2d_out_channels'],
                kernel_size=(hp['first_conv2d_kernel_size'], hp['first_conv2d_kernel_size']),
                padding='same',
                strides=(1, 1),
                activation=hp['first_conv2d_activation'],
                name="conv_deconv_1"
            )(concat_node_1)

        if hp['use_gap_1_or_flatten_0'] == 0:
            flatten_node = Flatten(name='flatten')(mp_node_2)
            dense_node = Dense(hp['num_feature_output'], activation=hp['dense_output_activation'],
                               name="img_feature_output")(flatten_node)
        elif hp['use_gap_1_or_flatten_0'] == 1:
            dense_node = GlobalAveragePooling2D(name="img_feature_output")(mp_node_2)

        concatenate_features = Concatenate(name="concat_features")([inp_node_dict, dense_node])

        out = Dense(self.n_trait, activation='linear', name="cnn_multioutput")(concatenate_features)

        model = Model(inputs=[inp_node, inp_node_dict], outputs=out, name="regression_model")

        return model

In [9]:
class ComboDataPool(keras.utils.Sequence):

    def __init__(self, images, features, labels, batch_size: int, max_len: int = -1):
        self.batch_size = batch_size
        self.images = images[:max_len]
        self.features = features[:max_len]
        self.labels = labels[:max_len]

    def __len__(self):
        return int(np.ceil(self.images.shape[0] / self.batch_size))

    def __getitem__(self, idx):
        batch_data = [self.images[idx * self.batch_size:(idx + 1) * self.batch_size],
                   self.features[idx * self.batch_size:(idx + 1) * self.batch_size]]
        batch_labels = self.labels[idx * self.batch_size:(idx + 1) * self.batch_size]

        return batch_data, batch_labels

In [21]:
import itertools
from sklearn.model_selection import KFold
from keras.callbacks import EarlyStopping, ModelCheckpoint


class ComboModelTuner:

    @staticmethod
    @tf.function
    def fit_loss_mae(y_true: np.ndarray, y_pred: np.ndarray) -> np.array:
        """
        MAE-функция потерь для обучения при помощи стандартного метода '.fit()'
        """
        error = y_true - y_pred
        abs_error_1, abs_error_2 = tf.abs(error[:, 0]), tf.abs(error[:, 1])
        result_1, result_2 = tf.reduce_mean(abs_error_1), tf.reduce_mean(abs_error_2)
        return (result_1 + result_2) / 2

    @staticmethod
    @tf.function
    def fit_loss_mse(y_true: np.ndarray, y_pred: np.ndarray) -> np.array:
        """
        MSE-функция потерь для обучения при помощи стандартного метода '.fit()'
        """
        error = y_true - y_pred
        squared_error_1, squared_error_2 = tf.square(error), tf.square(error[:, 1])
        result_1, result_2 = tf.reduce_mean(squared_error_1), tf.reduce_mean(squared_error_2)
        return (result_1 + result_2) / 2

    @staticmethod
    @tf.function
    def custom_loss_mae(y_true: np.ndarray, y_pred: np.ndarray) -> np.array:
        """
        MAE-функция потерь для обучения при помощи пользовательской реализации цикла обучения
        """
        error = y_true - y_pred
        abs_error_1, abs_error_2 = tf.abs(error[:, 0]), tf.abs(error[:, 1])
        result_1, result_2 = tf.reduce_mean(abs_error_1), tf.reduce_mean(abs_error_2)
        return (result_1 + result_2) / 2

    @staticmethod
    @tf.function
    def custom_loss_mse(y_true: np.ndarray, y_pred: np.ndarray) -> np.array:
        """
        MSE-функция потерь для обучения при помощи пользовательской реализации цикла обучения
        """
        error = y_true - y_pred
        squared_error_1, squared_error_2 = tf.square(error), tf.square(error[:, 1])
        result_1, result_2 = tf.reduce_mean(squared_error_1), tf.reduce_mean(squared_error_2)
        return (result_1 + result_2) / 2

    @staticmethod
    def custom_cv(total_hp: dict, cnn_hp: dict, model: 'SimpleCNNModel',
                  splits_num: int = 10,
                  early_stop: bool = True, model_checkpoint: bool = True,
                  data_generator: bool = False) -> list:
        model_keras = model.build(cnn_hp)

        learning_data_pool = ComboDataPool(images=train_images,
                                           features=train_dict,
                                           labels=train_labels,
                                           batch_size=64)

        callbacks = []
        if early_stop:
            callback_early_stop = EarlyStopping(monitor="loss", min_delta=0.001, patience=2, verbose=1)
            callbacks.append(callback_early_stop)
        if model_checkpoint:
            callback_checkpoint = ModelCheckpoint(filepath="checkpoints/model_no_df_{epoch}.keras",
                                                  save_best_only=True, monitor="loss", verbose=1)
            callbacks.append(callback_checkpoint)

        model_keras.compile(optimizer=keras.optimizers.SGD(learning_rate=0.001),
                            loss=ComboModelTuner.fit_loss_mae,
                            metrics=[ComboModelTuner.fit_loss_mse])

        learning_data_pool = ComboDataPool(images=train_images,
                                           features=train_dict,
                                           labels=train_labels,
                                           batch_size=64)

        mae_per_fold_tr, mse_per_fold_tr = [], []
        mae_per_fold_vd, mse_per_fold_vd = [], []

        kfold = KFold(n_splits=splits_num, shuffle=True)
        for j, (tr_idx, val_idx) in enumerate(kfold.split(model.features_train, model.data_train, model.labels_train)):
            if data_generator:
                history = model_keras.fit(learning_data_pool, epochs=total_hp["num_epochs_ll"],
                                      validation_data=([model.data_train[val_idx], model.features_train[val_idx]],
                                                      model.labels_train[val_idx]))
            else:
                history = model_keras.fit(x=[model.data_train[tr_idx], model.features_train[tr_idx]],
                                          y=model.labels_train[tr_idx],
                                          batch_size=total_hp["batch_size_ll"],
                                          epochs=total_hp["num_epochs_ll"],
                                          validation_data=([model.data_train[val_idx], model.features_train[val_idx]],
                                                          model.labels_train[val_idx]),
                                          callbacks=callbacks)

            scores = model_keras.evaluate(x=[model.data_train[tr_idx], model.features_train[tr_idx]],
                                          y=model.labels_train[tr_idx])

            mse_per_fold_tr.append(scores[0])
            mae_per_fold_tr.append(scores[1])

            scores = model_keras.evaluate(x=[model.data_train[val_idx], model.features_train[val_idx]],
                                          y=model.labels_train[val_idx])

            mse_per_fold_vd.append(scores[0])
            mae_per_fold_vd.append(scores[1])
            print(f"Fold #{j + 1} finished succesfully")

        return [mae_per_fold_tr, mse_per_fold_tr, mae_per_fold_vd, mse_per_fold_vd]

    @staticmethod
    def fit_cv(total_hp: dict, cnn_hp: dict, model: 'SimpleCNNModel',
               splits_num: int = 10,
               early_stop: bool = True, model_checkpoint: bool = True,
               data_generator: bool = False) -> list:
        model_keras = model.build(cnn_hp)

        learning_data_pool = ComboDataPool(images=train_images,
                                           features=train_dict,
                                           labels=train_labels,
                                           batch_size=64)

        callbacks = []
        if early_stop:
            callback_early_stop = EarlyStopping(monitor="loss", min_delta=0.001, patience=2, verbose=1)
            callbacks.append(callback_early_stop)
        if model_checkpoint:
            callback_checkpoint = ModelCheckpoint(filepath="checkpoints/model_no_df_{epoch}.keras",
                                                  save_best_only=True, monitor="loss", verbose=1)
            callbacks.append(callback_checkpoint)

        model_keras.compile(optimizer=keras.optimizers.SGD(learning_rate=0.001),
                            loss=ComboModelTuner.custom_loss_mae,
                            metrics=[ComboModelTuner.custom_loss_mse])

        mae_per_fold_tr, mse_per_fold_tr = [], []
        mae_per_fold_vd, mse_per_fold_vd = [], []

        kfold = KFold(n_splits=splits_num, shuffle=True)
        for j, (tr_idx, val_idx) in enumerate(kfold.split(model.features_train, model.data_train, model.labels_train)):
            if data_generator:
                history = model_keras.fit(learning_data_pool, epochs=total_hp["num_epochs_ll"],
                                          validation_data=([model.data_train[val_idx], model.features_train[val_idx]],
                                                          model.labels_train[val_idx]))
            else:
                history = model_keras.fit(x=[model.data_train[tr_idx], model.features_train[tr_idx]],
                                          y=model.labels_train[tr_idx],
                                          batch_size=total_hp["batch_size_ll"],
                                          epochs=total_hp["num_epochs_ll"],
                                          validation_data=([model.data_train[val_idx], model.features_train[val_idx]],
                                                          model.labels_train[val_idx]),
                                          callbacks=callbacks)

            scores = model_keras.evaluate(x=[model.data_train[tr_idx], model.features_train[tr_idx]],
                                          y=model.labels_train[tr_idx])

            mse_per_fold_tr.append(scores[0])
            mae_per_fold_tr.append(scores[1])

            scores = model_keras.evaluate(x=[model.data_train[val_idx], model.features_train[val_idx]],
                                          y=model.labels_train[val_idx])

            mse_per_fold_vd.append(scores[0])
            mae_per_fold_vd.append(scores[1])
            print(f"Fold #{j + 1} finished succesfully")

        return [mae_per_fold_tr, mse_per_fold_tr, mae_per_fold_vd, mse_per_fold_vd]

    @staticmethod
    def random_hyper_tuning(iters_num: int, hps_cnn: dict,
                            train_images_: np.ndarray, train_features_: np.ndarray, train_labels_: np.ndarray,
                            test_images_: np.ndarray, test_features_: np.ndarray, test_labels_: np.ndarray):

        valid_mae_label_1, valid_mae_label_2 = [], []
        valid_mse_label_1, valid_mse_label_2 = [], []
        valid_accuracy_label_1, valid_accuracy_label_2 = [], []

        for iter_ in range(iters_num):
            # Сборка комбинации случайных гиперпараметров в заданын границах
            print(f"Random Tuning iter #{iter_} started")

            cnn_hp_comb = {}

            for param in hps_cnn:
                if len(hps_cnn[param]) > 1:
                    if any(isinstance(x, bool) for x in hps_cnn[param]) or \
                            any(isinstance(x, str) for x in hps_cnn[param]):
                        cnn_hp_comb[param] = hps_cnn[param][np.random.randint(len(hps_cnn[param]))]
                    else:
                        cnn_hp_comb[param] = np.random.randint(low=min(hps_cnn[param]), high=max(hps_cnn[param]))
                else:
                    cnn_hp_comb[param] = hps_cnn[param][0]

            print(cnn_hp_comb)

            model = SimpleCNNModel(n_epochs=20,
                                   n_row=200,
                                   n_col=200,
                                   input_channels=1,
                                   random_seed=1234567890,
                                   n_dict_features=30,
                                   n_trait=2,
                                   data_train=train_images_,
                                   labels_train=train_labels_,
                                   features_train=train_features_,
                                   data_test=test_images_,
                                   features_test=test_features_,
                                   labels_test=test_labels_)

            # model = SimpleCNNModel()
            metrics = ComboModelTuner.fit_cv(total_hp={"batch_size_ll": 64, "num_epochs_ll": 20},
                                             cnn_hp=cnn_hp_comb,
                                             splits_num=5,
                                             model=model,
                                             early_stop=True)

            # считаем ошибку модели на тестовой выборке
            print(f"Random Tuning iter #{iter_} finished successfully")

    @staticmethod
    def grid_hyper_tuning(model: 'SimpleCNNModel', hps_cnn: dict, hps_reg: dict):
        cnn_hp_combos = itertools.product(*hps_cnn)
        reg_hp_combos = itertools.product(*hps_reg)

        valid_mae_label_1, valid_mae_label_2 = [], []
        valid_mse_label_1, valid_mse_label_2 = [], []
        valid_accuracy_label_1, valid_accuracy_label_2 = [], []

        for i, tmp_hps_cnn in enumerate(cnn_hp_combos):
            for j, tmp_hps_reg in enumerate(reg_hp_combos):
                model.fit(dict(tmp_hps_cnn), dict(tmp_hps_reg))

                # считаем ошибку модели на тестовой выборке
                valid_predict = model.predict
                print(f"Grid Tuning iter #{i * len(reg_hp_combos) + j} finished successfully")

In [12]:
# делим данные на обучение/валидацию/тест
test_percentage = 0.1
test_indices = np.random.choice(images.shape[0], int(images.shape[0] * test_percentage))
train_indices = np.setdiff1d(np.array(list(range(images.shape[0]))), test_indices)

train_images, train_labels, train_dict = images[train_indices], labels[train_indices], pca_features_[train_indices]
test_images, test_labels, test_dict = images[test_indices], labels[test_indices], pca_features_[test_indices]

In [22]:
# задаем сетку гиперпараметров для

model_hp = {# сначала идут параметры сверточной части модели
            'first_conv2d_out_channels': [32, 64],
            'first_conv2d_kernel_size': [3, 5, 7],
            'first_conv2d_activation': ['tanh', 'relu'],
            'need_extra_conv2d': [False, True],
            'extra_conv2d_out_channels': [32, 64],
            'extra_conv2d_kernel_size': [3, 5, 7],
            'extra_conv2d_activation': ['tanh', 'relu'],
            'need_batch_norm_after_first_conv2d': [True, False],
            'second_conv2d_kernel_size': [3, 5],
            'second_conv2d_out_channels': [64, 128],
            'second_conv2d_activation': ['tanh', 'relu'],
            'need_batch_norm_after_second_conv2d': [True, False],
            'dense_output_activation': ['sigmoid', 'linear'],
            'use_gap_1_or_flatten_0': [1, 0],
            'need_deconv_block': [False, True],
            'num_feature_output': [64, 128, 256],
        }

ComboModelTuner.random_hyper_tuning(10, model_hp,
                                    train_images_=train_images,
                                    train_labels_=train_labels,
                                    train_features_=train_dict,
                                    test_images_=test_images,
                                    test_labels_=test_labels,
                                    test_features_=test_dict)

Random Tuning iter #0 started
{'first_conv2d_out_channels': 53, 'first_conv2d_kernel_size': 3, 'first_conv2d_activation': 'tanh', 'need_extra_conv2d': False, 'extra_conv2d_out_channels': 35, 'extra_conv2d_kernel_size': 5, 'extra_conv2d_activation': 'tanh', 'need_batch_norm_after_first_conv2d': True, 'second_conv2d_kernel_size': 3, 'second_conv2d_out_channels': 107, 'second_conv2d_activation': 'relu', 'need_batch_norm_after_second_conv2d': True, 'dense_output_activation': 'linear', 'use_gap_1_or_flatten_0': 0, 'need_deconv_block': False, 'num_feature_output': 94}
Epoch 1/20
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 14s/step - custom_loss_mse: 171291.7812 - loss: 320.6811 
Epoch 1: loss improved from inf to 369.57660, saving model to checkpoints/model_no_df_1.keras
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m170s[0m 16s/step - custom_loss_mse: 177290.2031 - loss: 324.7557 - val_custom_loss_mse: 144514.7500 - val_loss: 334.7500
Epoch 2/20
[1m11/11[0m 

KeyboardInterrupt: 