In [1]:
import numpy as np
from keras.preprocessing import timeseries_dataset_from_array as _timeseries_dataset_from_array

# download FI2010 dataset from
# https://etsin.fairdata.fi/dataset/73eb48d7-4dbc-4a10-a52a-da745b47a649
_FI2010_DIR_ = r'D:\WORKS\translob\dataset\BenchmarkDatasets'
_add_path_ = r'/NoAuction/1.NoAuction_Zscore/NoAuction_Zscore'

PATH = _FI2010_DIR_ + _add_path_


# Save

    
def save_data(x, y, name):
    """
    kinds = 'test', 'train', 'val'
    """
    with open(f'saved_data/x_{name}.npy', 'wb') as file:
        np.save(file, x)
    with open(f'saved_data/y_{name}.npy', 'wb') as file:
        np.save(file, y)


# Load data
def _gen_data(data, horizon):
    x = data[:40, :].T  # 40 == 10 price + volume asks + 10 price + volume bids
    y = data[-5 + horizon, :].T  # 5
    return [x[:-1], (y[1:] - 1).astype(np.int32)]  # shift y by 1


def load_datas(horizon):
    dec_data = np.loadtxt(
        f'{PATH}_Training/Train_Dst_NoAuction_ZScore_CF_7.txt')

    dec_train = dec_data[:, :int(np.floor(dec_data.shape[1] * 0.8))]
    dec_val = dec_data[:, int(np.floor(dec_data.shape[1] * 0.8)):]

    dec_test1 = np.loadtxt(
        f'{PATH}_Testing/Test_Dst_NoAuction_ZScore_CF_7.txt')

    dec_test2 = np.loadtxt(
        f'{PATH}_Testing/Test_Dst_NoAuction_ZScore_CF_8.txt')

    dec_test3 = np.loadtxt(
        f'{PATH}_Testing/Test_Dst_NoAuction_ZScore_CF_9.txt')

    dec_test = np.hstack((dec_test1, dec_test2, dec_test3))

    datas = {
        'train': _gen_data(dec_train, horizon),
        'val': _gen_data(dec_val, horizon),
        'test': _gen_data(dec_test, horizon),
    }
    return datas


def load_saved_datas(max_number=None):
    """
    kinds = 'test', 'train', 'val'
    """
    datas = {}
    for kind in ['train', 'val', 'test']:
        try:
            with open(f'saved_data/x_{kind}.npy', 'rb') as file:
                x = np.load(file)
            with open(f'saved_data/y_{kind}.npy', 'rb') as file:
                y = np.load(file)
            if max_number is not None:
                x = x[:max_number]
                y = y[:max_number]
        except FileNotFoundError:
            x, y = None, None

        datas.update({kind: [x, y]})
    return datas


def inspect_data(data, name='data'):
    if data is not None:
        x = data[0]
        y = data[1]
        print(f'{name: <10}: x= {str(x.shape): <15} | y= {str(y.shape): <15}')
    else:
        print(f'{name <10}: None')


def inspect_datas(datas: dict):
    print('    Datas:')
    for name in datas:
        data = datas[name]
        inspect_data(data, name)


# build datasets
def build_dataset(
    x: np.ndarray,
    y: np.ndarray,
    seq_len,
    batch_size=128,
    **timeseries_kwargs,
):

    def set_shape(value_x, value_y):
        value_x.set_shape((None, seq_len, x.shape[-1]))
        return value_x, value_y

    ds = _timeseries_dataset_from_array(
        data=x,
        targets=y,
        batch_size=batch_size,
        sequence_length=seq_len,
        **timeseries_kwargs,
    )

    return ds.map(set_shape)


def build_datasets(datas: dict, batch_size, seq_len):
    datasets = {}
    for kind in datas:
        data = datas.get(kind, None)
        ds = None
        if data is not None:
            ds = build_dataset(
                x=data[0],
                y=data[1],
                batch_size=batch_size,
                seq_len=seq_len,
            )
        datasets.update({kind: ds})

    return datasets


def inspect_dataset(ds, name='dataset'):
    if ds is not None:
        print(f'{name: <10}: {[len(ds)]+ list(ds.element_spec[0].shape)[1:]}')
    else:
        print(f'{name <10}: None')


def inspect_datasets(datasets: dict):
    print('    Datasets:')
    for name in datasets:
        ds = datasets[name]
        inspect_dataset(ds, name)


In [2]:
class DataClass:
    """
    make only lover case parametrs and not start with _
    All this methods (exept __call__) only for beauty representation :)
    """

    @staticmethod
    def __not_data(field=None, get=False, not_data_fields: set = set()):
        if not get:
            not_data_fields.add(field.__name__)
            return field
        else:
            return not_data_fields

    def __new__(
        cls,
        target_dict: dict = None,
        name: str = '',
    ):
        """
        build from nested dict
        """
        if target_dict is not None:
            result = DataClass()
            return result.__rec_build(name, target_dict)
        return super().__new__(cls)

    def __rec_build(self, field_name: str, field):
        if not isinstance(field, dict):
            self.__setattr__(field_name, field)
            return None

        result = DataClass()
        self.__setattr__(field_name, result)

        for inner_field_name, inner_field in field.items():
            inner_result = result.__rec_build(
                inner_field_name,
                inner_field,
            )
            if inner_result is not None:
                self.__setattr__(field_name, inner_result)
        return result

    def __call__(self, **kwargs: dict):
        """
        Set up parametrs
        """
        for key, value in kwargs.items():
            self.__setattr__(key, value)

    def __get_all_fields(self):
        # Add except fields

        options = list(
            filter(
                lambda x:
                (x[0] != '_') and (x not in self.__not_data(get=True)),
                self.__dir__(),
            ))
        return options

    def __repr__(self) -> str:
        """
        Representation of options
        """
        return self.__rec_print()[4:]

    def _rec_print_depr(self, self_margin: str = ''):
        if not isinstance(self, DataClass):
            return f'{self}'

        result = self_margin
        for field_name in self.__get_all_fields():
            inner_result = DataClass._rec_print_depr(
                self.__getattribute__(field_name),
                self_margin + ' ' * 4,
            )
            result += f'\n{self_margin}{field_name}: {inner_result}'

        if self_margin == '':
            return result[1:]
        else:
            return result

    def __rec_print(
        self,
        self_name: str = '',
        self_header: str = '',
        last=True,
    ):
        end = "└─ "
        pipe = "│  "
        tee = "├─ "
        blank = "   "
        result = f'{self_header}{end if last else tee}{self_name}\n'

        if not isinstance(self, DataClass):
            if '<' in repr(self):
                self = repr(self).split('at')[0].replace('<', '').strip()

            return f'{self_header}{end if last else tee}{self_name}: {self}\n'

        fields = self.__get_all_fields()
        for field_name in fields:
            inner_result = DataClass.__rec_print(
                self.__getattribute__(field_name),
                self_name=field_name,
                self_header=f'{self_header}{blank if last else pipe}',
                last=field_name == fields[-1])

            result += inner_result[6:]

        return result

    @property
    @__not_data
    def Info_nested(self):
        """
        Containing options dict
        """
        return self.__rec_nested()

    def __rec_nested(self, self_name=None):
        if not isinstance(self, DataClass):
            return {self_name: self}

        result = {}
        for field_name in self.__get_all_fields():
            inner_result = DataClass.__rec_nested(
                self.__getattribute__(field_name),
                field_name,
            )
            result.update(inner_result)

        if self_name is None:
            return result
        else:
            return {self_name: result}

    @property
    @__not_data
    def Info_expanded(self):
        return {
            compound_key.strip()[2:]: value
            for value, compound_key in self.__rec_expanded()
        }

    def __rec_expanded(self, composite_key=''):
        if not isinstance(self, DataClass):
            yield (self, composite_key)
        else:
            for field_name in self.__get_all_fields():
                for inner_result in DataClass.__rec_expanded(
                        self.__getattribute__(field_name),
                        str(composite_key) + '__' + str(field_name),
                ):
                    yield inner_result

    def __getitem__(self, value):
        if isinstance(value, list | tuple):
            result = {}
            for i in value:
                result.update({i: getattr(self, i, None)})
            return DataClass(result)

        result = getattr(self, value, None)
        if isinstance(result, DataClass):
            return DataClass(result.Info_nested)
        else:
            return result


In [3]:
import numpy as np
from tqdm import tqdm
import keras
import tensorflow as tf

seq_len = 100

In [4]:
# Load data
row_data = (
    # data.load_dataset(horizon=4)
    load_saved_datas())

inspect_datas(row_data)

    Datas:
train     : x= (203799, 40)    | y= (203799,)      
val       : x= (50949, 40)     | y= (50949,)       
test      : x= (139586, 40)    | y= (139586,)      


In [5]:
# # Save data
# data.save_data(name= 'train',x= x_train,y=y_train)
# data.save_data(name= 'val',x= x_val,y=y_val)
# data.save_data(name= 'test',x= x_test,y=y_test)

In [6]:
"""
Realisation transformer from article
"""
import numpy as np
import tensorflow as tf
import keras

from typing import Union as _Union
from typing import Callable as _Callable
from keras.utils import get_custom_objects as _get_custom_objects
from keras import backend as _K


# Input
def input_block(seq_len):
    inputs = keras.Input(shape=(seq_len, 40))
    return inputs


# CN
def cnn_block(
    input_layer,
    filters,
    dilation_steps,
):
    dilation_steps = [
        2**dilation
        for dilation in range(dilation_steps + 1)
    ] # yapf: disable
    x = input_layer
    for dilation in dilation_steps:
        layer = keras.layers.Conv1D(
            filters=filters,
            kernel_size=2,
            dilation_rate=dilation,
            activation='relu',
            padding='causal',
        )
        x = layer(x)
    return x


# Normalisation
def norm_block(input_layer):

    norm = keras.layers.LayerNormalization()(input_layer)
    return norm


# Positional encoding
class PositionalEncoding(keras.layers.Layer):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, x, *args, **kwargs):
        steps, d_model = x.get_shape()[-2:]
        ps = np.zeros([steps, 1], dtype=_K.floatx())
        for step in range(steps):
            ps[step, :] = [(2 / (steps - 1)) * step - 1]

        ps_expand = _K.expand_dims(_K.constant(ps), axis=0)
        ps_tiled = _K.tile(ps_expand, [_K.shape(x)[0], 1, 1])

        x = _K.concatenate([x, ps_tiled], axis=-1)
        return x


def positional_encoder_block(input_layer):
    pos = PositionalEncoding()(input_layer)
    return pos


# Transformer
class MultiHeadSelfAttention(keras.layers.Layer):
    """
    Base class for Multi-head Self-Attention layers.
    """

    def __init__(self, num_heads: int, use_masking: bool, **kwargs):
        """
        :param num_heads: number of attention heads
        :param use_masking: when True, forbids the attention to see the further
          elements in the sequence.
        :param kwargs: any extra arguments typical for a Keras layer,
          such as name, etc.
        """
        self.num_heads = num_heads
        self.use_masking = use_masking
        self.qkv_weights = None
        super().__init__(**kwargs)

    def get_config(self):
        config = super().get_config()
        config['num_heads'] = self.num_heads
        config['use_masking'] = self.use_masking
        return config

    def build(self, input_shape):
        # if not isinstance(input_shape, TensorShape):
        #     raise ValueError('Invalid input')
        d_model = input_shape[-1]

        self.validate_model_dimensionality(d_model)
        self.qkv_weights = self.add_weight(
            name='qkv_weights',
            shape=(d_model, d_model * 3),  # * 3 for q, k and v
            initializer='glorot_uniform',
            trainable=True)

        return super().build(input_shape)

    def call(self, inputs, **kwargs):
        # if not K.is_keras_tensor(inputs):
        #     raise ValueError(
        #         'The layer can be called only with one tensor as an argument')
        _, seq_len, d_model = _K.int_shape(inputs)

        # Perform affine transformations to get the Queries, the Keys and the Values.
        qkv = _K.dot(inputs, self.qkv_weights)  # (-1,seq_len,d_model*3)
        qkv = _K.reshape(qkv, [-1, d_model * 3])

        # splitting the keys, the values and the queries.
        pre_q, pre_k, pre_v = [
            _K.reshape(
                qkv[:, i * d_model:(i + 1) * d_model],
                (-1, seq_len, self.num_heads, d_model // self.num_heads))
            for i in range(3)
        ]

        attention_out = self.attention(
            pre_q,
            pre_v,
            pre_k,
            seq_len,
            d_model,
            training=kwargs.get('training'),
        )
        # of shape (-1, seq_len, d_model)
        return attention_out

    def compute_output_shape(self, input_shape):
        shape_a, seq_len, d_model = input_shape
        return shape_a, seq_len, d_model

    def validate_model_dimensionality(self, d_model: int):
        if d_model % self.num_heads != 0:
            raise ValueError(
                f'The size of the last dimension of the input '
                f'({d_model}) must be evenly divisible by the number'
                f'of the attention heads {self.num_heads}')

    def attention(
        self,
        pre_q,
        pre_v,
        pre_k,
        seq_len: int,
        d_model: int,
        training=None,
    ):
        """
        Calculates the output of the attention once the affine transformations
        of the inputs are done. Here's the shapes of the arguments:
        :param pre_q: (batch_size, q_seq_len, num_heads, d_model // num_heads)
        :param pre_v: (batch_size, v_seq_len, num_heads, d_model // num_heads)
        :param pre_k: (batch_size, k_seq_len, num_heads, d_model // num_heads)
        :param seq_len: the length of the output sequence
        :param d_model: dimensionality of the model (by the paper)
        :param training: Passed by Keras. Should not be defined manually.
          Optional scalar tensor indicating if we're in training
          or inference phase.
        """
        d_submodel = d_model // self.num_heads

        # shaping Q and V into (batch_size, num_heads, seq_len, d_model//heads)
        q = _K.permute_dimensions(pre_q, [0, 2, 1, 3])
        v = _K.permute_dimensions(pre_v, [0, 2, 1, 3])
        k = _K.permute_dimensions(pre_k, [0, 2, 3, 1])

        q = _K.reshape(q, (-1, seq_len, d_submodel))
        k = _K.reshape(k, (-1, seq_len, d_submodel))
        v = _K.reshape(v, (-1, seq_len, d_submodel))
        qk = tf.einsum('aib,ajb->aij', q, k)
        sqrt_d = _K.constant(np.sqrt(d_model // self.num_heads),
                             dtype=_K.floatx())
        a = qk / sqrt_d
        a = self.mask_attention(a)
        a = _K.softmax(a)
        attention_heads = tf.einsum('aij,ajb->aib', a, v)
        attention_heads = _K.reshape(attention_heads,
                                     (-1, self.num_heads, seq_len, d_submodel))
        attention_heads = _K.permute_dimensions(attention_heads, [0, 2, 1, 3])
        attention_heads = _K.reshape(attention_heads, (-1, seq_len, d_model))

        return attention_heads

    def mask_attention(self, dot_product):
        """
        Makes sure that (when enabled) each position
        (of a decoder's self-attention) cannot attend to subsequent positions.
        :param dot_product: scaled dot-product of Q and K after reshaping them
        to 3D tensors (batch * num_heads, rows, cols)
        """
        if not self.use_masking:
            return dot_product
        last_dims = _K.int_shape(dot_product)[-2:]
        low_triangle_ones = (
            np.tril(np.ones(last_dims))
            # to ensure proper broadcasting
            .reshape((1, ) + last_dims))
        inverse_low_triangle = 1 - low_triangle_ones
        close_to_negative_inf = -1e9
        result = (
            _K.constant(low_triangle_ones, dtype=_K.floatx()) * dot_product +
            _K.constant(close_to_negative_inf * inverse_low_triangle))
        return result


_get_custom_objects().update({
    'MultiHeadSelfAttention': MultiHeadSelfAttention,
})


class CustomNormalization(keras.layers.Layer):
    """
    Implementation of Layer Normalization (https://arxiv.org/abs/1607.06450).
    """

    def __init__(self, axis=-1, **kwargs):
        self.axis = axis
        super().__init__(**kwargs)

    def get_config(self):
        config = super().get_config()
        config['axis'] = self.axis
        return config

    def build(self, input_shape):
        dim = input_shape[-1]
        self.gain = self.add_weight(
            name='gain',
            shape=(dim, ),
            initializer='ones',
            trainable=True,
        )
        self.bias = self.add_weight(
            name='bias',
            shape=(dim, ),
            initializer='zeros',
            trainable=True,
        )
        return super().build(input_shape)

    def call(self, inputs, **kwargs):
        mean = _K.mean(
            inputs,
            axis=self.axis,
            keepdims=True,
        )
        variance = _K.mean(
            _K.square(inputs - mean),
            axis=self.axis,
            keepdims=True,
        )
        epsilon = _K.constant(
            1e-5,
            dtype=_K.floatx(),
        )
        normalized_inputs = (inputs - mean) / _K.sqrt(variance + epsilon)
        result = self.gain * normalized_inputs + self.bias
        return result


class TransformerTransition(keras.layers.Layer):
    """
    Transformer transition function. The same function is used both
    in classical in Universal Transformers.
    """

    def __init__(
        self,
        activation: _Union[str, _Callable],
        size_multiplier: int = 4,
        **kwargs,
    ):
        """
        :param activation: activation function. Must be a string or a callable.
        :param size_multiplier: How big the hidden dimension should be.
          Most of the implementation use transition functions having 4 times
          more hidden units than the model itself.
        :param kwargs: Keras-specific layer arguments.
        """
        self.activation = keras.activations.get(activation)
        self.size_multiplier = size_multiplier
        super().__init__(**kwargs)

    def get_config(self):
        config = super().get_config()
        config['activation'] = keras.activations.serialize(self.activation)
        config['size_multiplier'] = self.size_multiplier
        return config

    def build(self, input_shape):
        d_model = input_shape[-1]
        self.weights1 = self.add_weight(
            name='weights1',
            shape=(d_model, self.size_multiplier * d_model),
            initializer='glorot_uniform',
            trainable=True,
        )
        self.biases1 = self.add_weight(
            name='biases1',
            shape=(self.size_multiplier * d_model),
            initializer='zeros',
            trainable=True,
        )
        self.weights2 = self.add_weight(
            name='weights2',
            shape=(self.size_multiplier * d_model, d_model),
            initializer='glorot_uniform',
            trainable=True,
        )
        self.biases2 = self.add_weight(
            name='biases2',
            shape=(d_model, ),
            initializer='zeros',
            trainable=True,
        )
        return super().build(input_shape)

    def call(self, inputs, **kwargs):
        input_shape = _K.int_shape(inputs)
        d_model = input_shape[-1]

        K_dot = _K.dot(_K.reshape(inputs, (-1, d_model)), self.weights1)
        step1 = self.activation(
            _K.bias_add(K_dot, self.biases1, data_format='channels_last'))

        K_dot = _K.dot(step1, self.weights2)
        step2 = _K.bias_add(K_dot, self.biases2, data_format='channels_last')
        result = _K.reshape(step2, (-1, ) + input_shape[-2:])
        return result


class TransformerLayer(keras.layers.Layer):
    """
    A pseudo-layer combining together all nuts and bolts to assemble
    a complete section of both the Transformer and the Universal Transformer
    models, following description from the "Universal Transformers" paper.
    Each such block is, essentially:
    - Multi-head self-attention (masked or unmasked)
    - Residual connection,
    - Layer normalization
    - Transition function
    - Residual connection
    - Layer normalization
    """

    def __init__(
        self,
        # name: str,
        num_heads: int,
        use_masking: bool = True,
        **kwargs,
    ):
        self.attention_layer = MultiHeadSelfAttention(
            num_heads,
            use_masking=use_masking,
            # name=f'{name}_self_attention',
        )
        self.norm1_layer = CustomNormalization()
        self.norm2_layer = CustomNormalization()
        self.transition_layer = TransformerTransition(activation='relu', )
        self.addition_layer = keras.layers.Add()
        super().__init__(**kwargs)

    def call(self, x, **kwargs):
        #PostLN: X -> attention -> +X -> norm1 -> transition -> +norm1 -> norm2
        attention = self.attention_layer(x)
        residual_1 = (self.addition_layer([x, attention]))
        norm_1 = self.norm1_layer(residual_1)

        transition = self.transition_layer(norm_1)
        residual_2 = (self.addition_layer([norm_1, transition]))
        norm_2 = self.norm2_layer(residual_2)

        return norm_2


def transformer_block(
    input_layer,
    share_weights,
    n_blocks,
    n_heads,
):
    x = input_layer
    tb = TransformerLayer(
        num_heads=n_heads,
        use_masking=True,
    )
    for block in range(n_blocks):
        if share_weights:
            x = tb(x)
        else:
            x = TransformerLayer(
                num_heads=n_heads,
                use_masking=True,
            )(x)

    return x


# FFN
def ffn_block(
    input_layer,
    dropout_rate,
    activation,
    units,
    kernel_regularizer,
    kernel_initializer,
):
    input_layer = keras.layers.Flatten()(input_layer)

    input_layer = keras.layers.Dense(
        units=units,
        activation=activation,
        kernel_regularizer=kernel_regularizer,
        kernel_initializer=kernel_initializer,
    )(input_layer)

    input_layer = keras.layers.Dropout(dropout_rate)(input_layer)
    out = keras.layers.Dense(
        units=3,
        activation='softmax',
    )(input_layer)
    return out


# Collection
class blocks:
    input_block = input_block
    cnn_block = cnn_block
    norm_block = norm_block
    positional_encoder_block = positional_encoder_block
    transformer_block = transformer_block
    ffn_block = ffn_block


# parametrs
PARAMETRS = {
    'seq_len': 100,
    'cn': dict(
        n_filters=14,
        dilation_steps=4,
    ),
    'an': dict(
        attention_heads=3,
        blocks=2,
        share_weights=False,
    ),
    'ff': dict(
        units = 64,
        dropout_rate=0.1,
        activation=keras.activations.relu,
        kernel_regularizer=keras.regularizers.L2(),
        kernel_initializer='glorot_uniform',
    ),
    'optimizer':
    keras.optimizers.legacy.Adam(
        learning_rate=0.0001,
        beta_1=0.9,
        beta_2=0.999,
    ),
} #yapf:disable


# build
def build_model(
    seq_len,
    cn__n_filters,
    cn__dilation_steps,
    an__blocks,
    an__attention_heads,
    an__share_weights,
    ff__units,
    ff__dropout_rate,
    ff__activation,
    ff__kernel_regularizer,
    ff__kernel_initializer,
    optimizer,
):
    # Model
    inputs = blocks.input_block(seq_len)
    x = inputs
    x = blocks.cnn_block(
        input_layer=x,
        filters=cn__n_filters,
        dilation_steps=cn__dilation_steps,
    )
    x = blocks.norm_block(input_layer=x)
    x = blocks.positional_encoder_block(input_layer=x)
    x = blocks.transformer_block(
        input_layer=x,
        n_blocks=an__blocks,
        n_heads=an__attention_heads,
        share_weights=an__share_weights,
    )
    x = blocks.ffn_block(
        input_layer=x,
        units=ff__units,
        dropout_rate=ff__dropout_rate,
        activation=ff__activation,
        kernel_regularizer=ff__kernel_regularizer,
        kernel_initializer=ff__kernel_initializer,
    )

    model = keras.Model(inputs=inputs, outputs=x)

    # Compile
    model.compile(
        optimizer,
        loss=keras.losses.SparseCategoricalCrossentropy(),
        metrics=[
            keras.metrics.SparseCategoricalAccuracy(name='sp_acc'),
            keras.metrics.CategoricalAccuracy(name='acc'),
        ],
    )
    return model


In [7]:
# Datasets
datasets = build_datasets(
    datas=row_data,
    batch_size=512,
    seq_len=seq_len,
)
(ds_train, ds_val, ds_test) =\
(datasets['train'], datasets['val'], datasets['test'])
inspect_datasets(datasets)

    Datasets:
train     : [398, 100, 40]
val       : [100, 100, 40]
test      : [273, 100, 40]


In [8]:
# Build
model_name = 'base_model_low'
pars = DataClass(PARAMETRS)
model = build_model(**pars.Info_expanded)
model.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 100, 40)]         0         
                                                                 
 conv1d (Conv1D)             (None, 100, 14)           1134      
                                                                 
 conv1d_1 (Conv1D)           (None, 100, 14)           406       
                                                                 
 conv1d_2 (Conv1D)           (None, 100, 14)           406       
                                                                 
 conv1d_3 (Conv1D)           (None, 100, 14)           406       
                                                                 
 conv1d_4 (Conv1D)           (None, 100, 14)           406       
                                                                 
 layer_normalization (Layer  (None, 100, 14)           28    

In [9]:
# Callbacks
callbacks = [
    keras.callbacks.TensorBoard(
        f"Temp/callbacks/{model_name}",
        histogram_freq=1,
        update_freq=1,
    ),
    tf.keras.callbacks.ModelCheckpoint(
        f"Temp/callbacks/{model_name}/checkPoint",
        monitor="val_loss",
        verbose=0,
        save_best_only=False,
        save_weights_only=True,
        mode="auto",
        save_freq=50,
        options=None,
        initial_value_threshold=None,
    )
]

In [10]:
# Train
model.fit(
    ds_train,
    epochs=20,
    validation_data=ds_val,
    callbacks=callbacks,
)

Epoch 1/20
  3/398 [..............................] - ETA: 7:38 - loss: 4.0748 - sp_acc: 0.2337 - acc: 0.0000e+00

KeyboardInterrupt: 