In [None]:
# %pip install tensorflow
# %pip install distutils
%pip install scikit-learn

In [8]:
import numpy as np

from typing import Callable, Iterable

def QLearning(alpha: float, eta: float, epsilon: float, episode_count: int,
              reward: Callable[[object, object], float],
              get_init_state: Callable[[], object],
              get_actions: Callable[[object], Iterable[object]],
              next_state: Callable[[object, object], object],
              end_episode: Callable[[object, object, object], bool]):
    Q = {}
    for _ in range(episode_count):
        state = get_init_state()
        while True:
            actions = get_actions(state)
            
            if not actions:
                action = None
            elif np.random.rand() < epsilon:
                action = np.random.choice(actions)
            else:
                action = max(actions, key=lambda a: Q.get((state, a), 0))

            next_state = next_state(state, action)
            
            reward_value = reward(state, action)
            td_target = reward_value + eta * max(get_actions(next_state), key=lambda a: Q.get((next_state, a), 0))
            td_error = td_target - Q.get((state, action), 0)
            Q[(state, action)] += alpha * td_error

            if end_episode(state, action, next_state):
                break

            state = next_state

    pi = {}
    for (state, action), value in Q.items():
        if state not in pi or value > Q[(state, pi[state])]:
            pi[state] = action

    return pi


In [22]:
from typing import Literal
import tensorflow as tf
from tensorflow.keras.models import Sequential # type: ignore
from tensorflow.keras.layers import Dense, Input # type: ignore

type ActivationFunction = Literal['relu', 'sigmoid', 'softmax'] | None
type Layer = tuple[int, ActivationFunction]

class DNN:
    def __init__(self, input_size: int, layers_config: list[Layer]):
        self.model = Sequential()

        self.model.add(Input(shape=(input_size,)))
        for layer_size, layer_activation in layers_config:
            self.model.add(Dense(layer_size, activation=layer_activation))

        self.model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])

    def train(self, X_train, Y_train, batch_size: int = 32, epochs: int = 100, verbose: bool = False):
        self.model.fit(X_train, Y_train, batch_size=batch_size, epochs=epochs, verbose=verbose)

    def evaluate(self, X_test, Y_test):
        return self.model.evaluate(X_test, Y_test)
    
    def predict(self, X):
        return self.model.predict(X)
    
    def predict_single(self, x):
        return self.model.predict(np.array([x]))[0]
    
    def save(self, filename: str):
        self.model.save(filename)

    def load_weights(self, weights):
        if not self.model.weights:  # check if the model has been initialized
            raise ValueError("Model has no weights; ensure the model is properly initialized.")
        self.model.set_weights(weights)

    def get_weights(self):
        if not self.model.weights:  # check if the model has been initialized
            raise ValueError("Model has no weights; ensure the model is properly initialized.")
        return self.model.get_weights()

    @staticmethod
    def load(filename: str):
        model = tf.keras.models.load_model(filename)
        dnn = DNN([])
        dnn.model = model
        return dnn
    
    def summary(self):
        self.model.summary()


In [26]:
import random
from collections import deque

def DeepQLearning(eta: float, epsilon: Callable[[int], float], episode_count: int,
                  reward: Callable[[object, object], float],
                  get_init_state: Callable[[], object],
                  get_actions: Callable[[object], Iterable[object]],
                  next_state: Callable[[object, object], object],
                  end_episode: Callable[[object, object, object], bool],
                  q_input_size: int,
                  q_layers_config: list[Layer],
                  preprocess_input: Callable[[object, object], np.array],
                  train_step_count: int,
                  update_step_count: int,
                  memory_size: int,
                  batch_size: int):

    memory = deque(maxlen=memory_size)
    Q = DNN(q_input_size, q_layers_config)
    Q_final = DNN(q_input_size, q_layers_config)

    steps = 0
    for episode in range(episode_count):
        state = get_init_state()
        while True:
            actions = get_actions(state)
            
            if not actions:
                action = None
            elif np.random.rand() < epsilon(episode):
                action = np.random.choice(actions)
            else:
                action = max(actions, key=lambda a: Q.predict_single(preprocess_input(state, a)))

            next_state = next_state(state, action)
            reward_value = reward(state, action)
            memory.append( (state, action, next_state, reward_value) )

            if steps % train_step_count == 0:
                batch = random.choices(memory, k=batch_size if len(memory) > batch_size else len(memory))
                X_train = np.array([preprocess_input(state, action) for state, action, _, _ in batch])
                aux = []
                for _, _, ns, r in batch:
                    na = get_actions(ns)
                    if not na:
                        aux.append(r)
                    else:
                        aux.append(r + eta * max(na, key=lambda a: Q.predict_single(preprocess_input(ns, a))))
                Y_train = np.array(aux)
                Q.train(X_train, Y_train)
                steps += 1

            if steps % update_step_count == 0:
                Q_final.load_weights(Q.get_weights())

            if end_episode(state, action, next_state):
                break

            state = next_state

    Q_final.load_weights(Q.get_weights())

    return lambda s: None if not get_actions(s) else max(get_actions(s), key=lambda a: Q_final.predict_single(preprocess_input(s, a)))