In [None]:
%reload_ext lab_black
import tensorflow as tf
import numpy as np
import pandas as pd
import sys

sys.path.append("/home/jupyter/tf/src")
import meta, data_wrangling, modeling
from IPython.display import clear_output

In [None]:
code_name = "hs04_test"
tf_root = "/home/jupyter/tf"

# Dataset
sample_name = "hs04"
rng_seed = 53797

# Model architechture
ort_units = 119
pho_units = 250
pho_hidden_units = 100
pho_cleanup_units = 50
pho_noise_level = 0.0

activation = "sigmoid"

tau = 1 / 3
max_unit_time = 4.0
output_ticks = 2

# Training
n_mil_sample = 0.1
batch_size = 100
learning_rate = 0.005
save_freq = 10

In [None]:
config_dict = {}

# Load global cfg variables into a dictionary
for v in meta.CORE_CONFIGS:
    try:
        config_dict[v] = globals()[v]
    except:
        print("Missing CORE config(s)")

for v in meta.OPTIONAL_CONFIGS:
    try:
        config_dict[v] = globals()[v]
    except:
        pass

# Construct ModelConfig object
cfg = meta.ModelConfig(**config_dict)
cfg.save()

In [None]:
tf.random.set_seed(rng_seed)
data = data_wrangling.MyData()

### Lightning style code refactoring

In [None]:
import tensorflow as tf
import tensorflow.keras.backend as K
from IPython.display import clear_output


class RNN(tf.keras.layers.Layer):
    """
    Main time-averaged input implementation based on Plaut 96 (Fig 12.)
    With additional attractor (cleanup) network
    Option to use semantic input by cfg.use_semantic == True
    Semantic equation can be change in modeling.input_s()
    """

    def __init__(
        self,
        activation,
        pho_units,
        pho_hidden_units,
        pho_cleanup_units,
        pho_noise_level,
        n_timesteps,
        tau,
        output_ticks,
        name="rnn",
        **kwargs
    ):

        super().__init__(**kwargs)
        self.pho_hidden_units = pho_hidden_units
        self.pho_units = pho_units
        self.pho_cleanup_units = pho_cleanup_units
        self.pho_noise_level = pho_noise_level
        self.tau = tau
        self.n_timesteps = n_timesteps
        self.output_ticks = output_ticks
        self.activation = tf.keras.activations.get(activation)

    def build(self, input_shape):

        weight_initializer = tf.random_uniform_initializer(minval=-0.1, maxval=0.1)

        """Build weights and biases"""
        self.w_oh = self.add_weight(
            name="w_oh",
            shape=(input_shape[-1], self.pho_hidden_units),
            initializer=weight_initializer,
            trainable=True,
        )

        self.w_hp = self.add_weight(
            name="w_hp",
            shape=(self.pho_hidden_units, self.pho_units),
            initializer=weight_initializer,
            trainable=True,
        )

        self.w_pp = self.add_weight(
            name="w_pp",
            shape=(self.pho_units, self.pho_units),
            initializer=weight_initializer,
            trainable=True,
        )

        self.w_pc = self.add_weight(
            name="w_pc",
            shape=(self.pho_units, self.pho_cleanup_units),
            initializer=weight_initializer,
            trainable=True,
        )

        self.w_cp = self.add_weight(
            name="w_cp",
            shape=(self.pho_cleanup_units, self.pho_units),
            initializer=weight_initializer,
            trainable=True,
        )

        self.bias_h = self.add_weight(
            shape=(self.pho_hidden_units,),
            name="bias_h",
            initializer="zeros",
            trainable=True,
        )

        self.bias_p = self.add_weight(
            shape=(self.pho_units,),
            name="bias_p",
            initializer="zeros",
            trainable=True,
        )

        self.bias_c = self.add_weight(
            shape=(self.pho_cleanup_units,),
            name="bias_c",
            initializer="zeros",
            trainable=True,
        )

        self.built = True

    def call(self, inputs, training=None):
        """
        Dimension note: (batch, timestep, input_dim)
        Hack for complying keras.layers.concatenate() format
        Spliting input_dim below (index = 2)
        """

        # init
        input_h_list, input_p_list, input_c_list = [], [], []
        act_h_list, act_p_list, act_c_list = [], [], []

        # Set inputs to 0
        input_h_list.append(tf.zeros((1, self.pho_hidden_units), dtype=tf.float32))
        input_p_list.append(tf.zeros((1, self.pho_units), dtype=tf.float32))
        input_c_list.append(tf.zeros((1, self.pho_cleanup_units), dtype=tf.float32))

        # Set activations to 0.5
        act_h_list.append(input_h_list[0] + 0.5)
        act_p_list.append(input_p_list[0] + 0.5)
        act_c_list.append(input_c_list[0] + 0.5)

        for t in range(self.n_timesteps):
            # Inject fresh white noise to weights and biases within pho system in each time step
            w_pp = K.in_train_phase(
                self._inject_noise(self.w_pp, self.pho_noise_level), self.w_pp
            )
            w_pc = K.in_train_phase(
                self._inject_noise(self.w_pc, self.pho_noise_level), self.w_pc
            )
            w_cp = K.in_train_phase(
                self._inject_noise(self.w_cp, self.pho_noise_level), self.w_cp
            )
            bias_c = K.in_train_phase(
                self._inject_noise(self.bias_c, self.pho_noise_level), self.bias_c
            )
            bias_p = K.in_train_phase(
                self._inject_noise(self.bias_p, self.pho_noise_level), self.bias_p
            )

            ##### Hidden layer #####
            oh = tf.matmul(inputs[:, t, :], self.w_oh)
            h = self.tau * (oh + self.bias_h) + (1 - self.tau) * input_h_list[t]

            ##### Phonology layer #####
            hp = tf.matmul(act_h_list[t], self.w_hp)
            pp = tf.matmul(act_p_list[t], w_pp)
            cp = tf.matmul(act_c_list[t], w_cp)

            p = self.tau * (hp + pp + cp + bias_p)
            p += (1 - self.tau) * input_p_list[t]

            ##### Cleanup layer #####
            pc = tf.matmul(act_p_list[t], w_pc)
            c = self.tau * (pc + bias_c) + (1 - self.tau) * input_c_list[t]

            # Record this timestep to list
            input_h_list.append(h)
            input_p_list.append(p)
            input_c_list.append(c)

            act_h_list.append(self.activation(h))
            act_p_list.append(self.activation(p))
            act_c_list.append(self.activation(c))

        return act_p_list[-self.output_ticks :]

    def _inject_noise(self, x, noise_sd):
        """Inject Gaussian noise if noise_sd > 0"""
        if noise_sd > 0:
            noise = K.random_normal(shape=K.shape(x), mean=0.0, stddev=noise_sd)

            return x + noise
        else:
            return x

    def get_config(self):
        cfg = super().get_config().copy()
        cfg.update(
            {
                "pho_hidden_units": self.pho_hidden_units,
                "pho_units": self.pho_units,
                "pho_cleanup_units": self.pho_cleanup_units,
                "pho_noise_level": self.pho_noise_level,
                "tau": self.tau,
                "n_timesteps": self.n_timesteps,
                "output_ticks": self.output_ticks,
                "activation": tf.keras.activations.serialize(self.activation),
            }
        )
        return cfg

In [None]:
from tensorflow.keras.layers import Input, RepeatVector


def build_model(training=True, **kwargs):
    """
    Create model
    """
    input_o = Input(shape=(cfg.ort_units,), name="Input_O")
    input_o_t = RepeatVector(cfg.n_timesteps, name="Input_Ot")(input_o)
    rnn_model = RNN(
        activation=cfg.activation,
        pho_units=cfg.pho_units,
        pho_hidden_units=cfg.pho_hidden_units,
        pho_cleanup_units=cfg.pho_cleanup_units,
        pho_noise_level=cfg.pho_noise_level,
        n_timesteps=cfg.n_timesteps,
        tau=cfg.tau,
        output_ticks=cfg.output_ticks,
    )(input_o_t)
    model = tf.keras.Model(input_o, rnn_model)

    model.summary()
    return model


model = build_model(training=True)

In [None]:
model.compile(
    loss="binary_crossentropy",
    optimizer=tf.keras.optimizers.Adam(
        learning_rate=cfg.learning_rate, beta_1=0.0, beta_2=0.999, amsgrad=False
    ),
    metrics=["BinaryAccuracy", "mse"],
)

In [None]:
# Create sampling instance
my_sampling = data_wrangling.Sampling(cfg, data)

checkpoint = modeling.ModelCheckpoint_custom(
    cfg.path["weights_checkpoint_fstring"],
    save_weights_only=True,
    period=cfg.save_freq,
)


history = model.fit(
    my_sampling.sample_generator(),
    steps_per_epoch=cfg.steps_per_epoch,
    epochs=cfg.total_number_of_epoch,
    verbose=0,
    callbacks=[checkpoint],
)

In [None]:
import pickle

# Saving history and model
with open(cfg.path["history_pickle"], "wb") as f:
    pickle.dump(history.history, f)

clear_output()
print("Training done")

In [None]:
model.save("model.h5")

In [None]:
new_model = tf.keras.models.load_model("model.h5", custom_objects={"RNN": RNN})

In [None]:
model = build_model(training=False)

In [None]:
import evaluate
import importlib

importlib.reload(evaluate)
strain = evaluate.strain_eval(cfg, data, model)

In [None]:
import os

strain.start_evaluate(
    test_use_semantic=False,
    output=os.path.join(cfg.path["model_folder"], "result_strain_item.csv"),
)

In [None]:
import altair as alt

strain.i_hist.columns

In [None]:
alt.Chart(strain.i_hist).mark_line().encode(
    x="epoch:Q", y="mean(acc):Q", color="condition_pf:N"
)