In [None]:
# Imports
import os
import random
import base64
import logging
import pathlib
import typing
import numpy as np
import pandas as pd
import datetime as dt
import keras

In [None]:
# Load data
array_data_filename = pathlib.Path("data/npy/array_data.npy")
number_data_filename = pathlib.Path("data/npy/number_data.npy")
target_data_filename = pathlib.Path("data/npy/target_data.npy")

if array_data_filename.is_file() and number_data_filename.is_file() and target_data_filename.is_file():
    logging.info("Loading data from .npy files")
    array_data_full = np.load(array_data_filename, mmap_mode = "r")
    number_data_full = np.load(number_data_filename, mmap_mode = "r")
    target_data_full = np.load(target_data_filename, mmap_mode = "r")
else:
    logging.info("Loading data from .csv files and transforming them")
    data = pd.read_csv("data/csv/parsed_data1-6.csv")
    array_data_full = np.array([np.frombuffer(base64.b64decode(board), dtype=np.bool_).reshape((6,7,2)) for board in data["board"]])
    number_data_full = np.array([int(np.sum(array_data_full[i][:, :, 0]))%2==0 for i in range(len(array_data_full))])
    target_data_full = np.array(data["evaluation"].values)

In [None]:
# Get full data sample
array_data = array_data_full
number_data = number_data_full
target_data = target_data_full

In [None]:
# Get smaller data sample
indices = random.sample(range(len(target_data_full)), 100000)

array_data = []
number_data = []
target_data = []

for index in indices:
    array_data.append(array_data_full[index])
    number_data.append(number_data_full[index])
    target_data.append(target_data_full[index])

array_data = np.array(array_data)
number_data = np.array(number_data)
target_data = np.array(target_data)

In [None]:
# Setup variables
MODEL_NAME = "frail-consultancy"
MODEL_PATH = pathlib.Path("models") / MODEL_NAME
LOSS = "mean_squared_error"
METRICS = ["mae"]
BATCH_SIZE = 128

In [None]:
# Create model

# Define the input layers
number_input = keras.layers.Input(shape = (1,), name = "number_input") # type: ignore
array_input = keras.layers.Input(shape = (6, 7, 2), name = "array_input")

# Flatten the array input
flattened_array = keras.layers.Flatten()(array_input)

# Concatenate the flattened array input with the number input
concatenated_input = keras.layers.Concatenate()([number_input, flattened_array])

# Dense layers for processing concatenated inputs
dense1 = keras.layers.Dense(2048, activation = "relu")(concatenated_input)
dense2 = keras.layers.Dense(2048, activation = "relu")(dense1)
dense3 = keras.layers.Dense(2048, activation = "relu")(dense2)

# Convolutional layers for processing the array input
conv1 = keras.layers.Conv2D(2048, kernel_size = (3, 3), activation = "relu", padding = "same")(array_input)
pool1 = keras.layers.MaxPooling2D(pool_size = (2, 2))(conv1)
conv2 = keras.layers.Conv2D(2048, kernel_size = (3, 3), activation = "relu", padding = "same")(pool1)
pool2 = keras.layers.MaxPooling2D(pool_size = (2, 2))(conv2)
flatten = keras.layers.Flatten()(pool2)

dense4 = keras.layers.Dense(2048, activation = "relu")(flatten)
dense5 = keras.layers.Dense(2048, activation = "relu")(dense4)
dense6 = keras.layers.Dense(2048, activation = "relu")(dense5)

# Concatenate the output of the dense and convolutional layers
concatenated_output = keras.layers.Concatenate()([dense3, dense6])

# Additional Dense layers
dense7 = keras.layers.Dense(2048, activation = "relu")(concatenated_output)
dense8 = keras.layers.Dense(2048, activation = "relu")(dense7)
dense9 = keras.layers.Dense(2048, activation = "relu")(dense8)
dense10 = keras.layers.Dense(2048, activation = "relu")(dense9)
dense11 = keras.layers.Dense(2048, activation = "relu")(dense10)

# Output layer
output = keras.layers.Dense(1, activation = "linear", name = "output")(dense11)

# Create the model
model = keras.models.Model(inputs = [number_input, array_input], outputs = output)

# Compile the model
model.compile(optimizer = "adam", loss = LOSS, metrics = METRICS)

# Print model summary
model.summary()

In [None]:
# Fit model
model.fit([number_data, array_data], target_data, epochs = 3, batch_size = BATCH_SIZE, validation_split = 0.2)

In [None]:
# Test model
test_loss, test_mertics = model.evaluate([number_data, array_data], target_data, batch_size = BATCH_SIZE)
print(f"Test loss: {test_loss}, Test metrics: {test_mertics}")

In [None]:
# Save model
model.save(MODEL_PATH / f"{dt.datetime.today().strftime('%Y-%m-%d_%H-%M-%S')}.keras")

In [None]:
# Train model
while True:
    model_name = sorted([f for f in os.listdir(MODEL_PATH) if os.path.isfile(os.path.join(MODEL_PATH, f)) and f.endswith(".keras")])[-1]
    model: typing.Any = keras.models.load_model(MODEL_PATH / model_name)
    model.compile(optimizer = "adam", loss = LOSS, metrics = METRICS)
    model.fit([number_data, array_data], target_data, epochs = 1, batch_size = BATCH_SIZE, validation_split = 0.2)
    model.save(MODEL_PATH / f"{dt.datetime.today().strftime('%Y-%m-%d_%H-%M-%S')}.keras")