# Notes

[Reinforcement Learning Article](https://towardsdatascience.com/reinforcement-learning-explained-visually-part-5-deep-q-networks-step-by-step-5a5317197f4b)


# Environment Setup


## Import Modules


In [1]:
import numpy as np
import tensorflow as tf
from random import Random
from functools import reduce
import pandas as pd
import os
import json

## Move Methods

So I did some syntax lessons


In [2]:
class Move:
    def __init__(
        self, name: str, loops: list[list[int]], two: bool = False, prime: bool = False
    ):
        self.name = name
        self.matrix: np.ndarray = np.identity(9 * 6, dtype=np.int8)
        for loop in loops:
            first = np.copy(self.matrix[loop[0]])
            for i in range(len(loop) - 1):
                self.matrix[loop[i]] = self.matrix[loop[i + 1]]
            self.matrix[loop[-1]] = first
        if two:
            self.matrix = self.matrix @ self.matrix
        if prime:
            self.matrix = self.matrix.T

    def __str__(self):
        return f"Move: {self.name}"


def build_moves(letter: str, loops: list[list[int]]) -> list[Move]:
    return [
        Move(letter, loops),
        Move(f"{letter}P", loops, prime=True),
        Move(f"{letter}2", loops, two=True),
    ]


# Behold, python syntax
MOVES = [
    move
    for moves in [
        build_moves(
            "R",
            [
                [20, 2, 42, 47],
                [23, 5, 39, 50],
                [26, 8, 36, 53],
                [27, 29, 35, 33],
                [28, 32, 34, 30],
            ],
        ),
        build_moves(
            "U",
            [
                [20, 11, 38, 29],
                [19, 10, 37, 28],
                [18, 9, 36, 27],
                [8, 6, 0, 2],
                [7, 3, 1, 5],
            ],
        ),
        build_moves(
            "L",
            [
                [18, 45, 44, 0],
                [21, 48, 41, 3],
                [24, 51, 38, 6],
                [11, 17, 15, 9],
                [14, 16, 12, 10],
            ],
        ),
        build_moves(
            "D",
            [
                [24, 33, 42, 15],
                [25, 34, 43, 16],
                [26, 35, 44, 17],
                [45, 47, 53, 51],
                [46, 50, 52, 48],
            ],
        ),
        build_moves(
            "F",
            [
                [6, 27, 47, 17],
                [7, 30, 46, 14],
                [8, 33, 45, 11],
                [18, 20, 26, 24],
                [19, 23, 25, 21],
            ],
        ),
        build_moves(
            "B",
            [
                [36, 38, 44, 42],
                [37, 41, 43, 39],
                [29, 0, 15, 53],
                [32, 1, 12, 52],
                [35, 2, 9, 51],
            ],
        ),
    ]
    for move in moves
]


## The Cube Environment


In [3]:
def new_cube():
    state = np.zeros((9 * 6), dtype=np.int8)
    for i in range(state.size):
        state[i] = i / 9
    return state


def apply_move(state, move: Move) -> np.ndarray:
    return state @ move.matrix


def scramble(state: np.ndarray, count: int) -> np.ndarray:
    random = Random()
    return state @ reduce(
        lambda a, b: a @ b, [random.choice(MOVES).matrix for i in range(count)]
    )


# Machine Learning Setup


## Constants


In [4]:
EPSILON = 0.5
"The chance that the agent will choose to explore instead of picking the best answer"


'The chance that the agent will choose to explore instead of picking the best answer'

## Converting State to Vector

In order to make an accurate network, we will need to convert the cube's state array to a longer array to make it clearer to the network what color is where


In [5]:
def state_to_vector(state):
    vector = np.zeros((9 * 6 * 6,1),dtype=np.float32)
    for i in range(9 * 6):
        color = state[i]
        vector[i * 6 + color] = 1
    return vector.T
        

## The Neural Network


In [6]:
def random_network(sizes: list[int]) -> list[tuple[(tf.Variable, tf.Variable)]]:
    sizes = sizes + [len(MOVES)]
    values = []
    for i in range(len(sizes)):
        size = sizes[i]
        prev_size = 9 * 6 * 6
        if i > 0:
            prev_size = sizes[i - 1]
        weights = tf.Variable(
            tf.random.normal([prev_size, size], stddev=0.03), name=f"W{i+1}"
        )
        constants = tf.Variable(tf.random.normal([size]), name=f"b{i+1}")
        values.append((weights, constants))
    return values


def feed_network(state, network: list[tuple[(tf.Variable, tf.Variable)]]):
    x = tf.cast(state, tf.float32)
    for i in range(len(network)):
        W, b = network[i]
        if i > 0:
            x = tf.nn.softsign(x)
        x = tf.add(tf.matmul(x, W), b)
    return x


def copy_network(network: list[tuple[(tf.Variable, tf.Variable)]]):
    copy = []
    for layer in network:
        W, b = layer
        copy.append((np.copy(W), np.copy(b)))
    return copy


## Reward Function


In [7]:
def get_reward(state: np.ndarray):
    value = 0
    for i in range(9 * 6):
        if state[i] == i // 9:
            value = value + 1
        else:
            value = value - 1
    return value / (9 * 6)


## Replay Database


In [8]:
def create_replay(
    network: list[tuple[(tf.Variable, tf.Variable)]],
    count: int,
    epsilon: float = EPSILON,
):
    replays: list[
        tuple[
            (
                np.ndarray,  # current state
                int,  # action
                np.ndarray,  # next state
                tf.float32,  # Q-Value
            )
        ]
    ] = []

    random = Random()

    cube = scramble(new_cube(), 10000)

    for i in range(count):
        choice: int = -1
        if random.random() < epsilon:
            choice = random.randrange(0, len(MOVES))
        else:
            q_vals = feed_network(state_to_vector(cube), network)
            index_max = tf.argmax(q_vals, 1).numpy()[0]
            choice = index_max
        new_state = apply_move(cube, MOVES[choice])

        replays.append(
            (
                state_to_vector(cube),
                choice,
                state_to_vector(new_state),
                get_reward(new_state),
            )
        )
        cube = new_state

    return replays


# Accuracy Predictor

This function tests how well the network runs


In [9]:
def accuracy(network):

    count = 50
    total_value = 0
    for i in range(count):
        cube = scramble(new_cube(), 100)
        count = 0
        while count < 100 and get_reward(cube) < 9 * 6:
            count: int = count + 1
            vals = feed_network(state_to_vector(cube), network)
            apply_move(cube, MOVES[tf.argmax(vals)[0]])

        total_value: int = total_value + get_reward(cube)
    return total_value / count


## DQN Operation

I think this is what it does


In [10]:
def DQN(
    network: list[tuple[(tf.Variable, tf.Variable)]],
    target: list[tuple[(tf.Variable, tf.Variable)]],
    replays: list[
        tuple[
            (
                np.ndarray,
                int,
                np.ndarray,
                tf.float32,
            )
        ]
    ],
    lr_schedule=tf.keras.optimizers.schedules.ExponentialDecay(
        initial_learning_rate=1e-2, decay_steps=10000, decay_rate=0.9
    ),
):

    with tf.GradientTape() as tape:

        # I HAVE NO IDEA
        trainable_variables = [var for vars in network for var in vars]

        for variable in trainable_variables:
            tape.watch(variable)

        action = [replay[1] for replay in replays]
        for i in range(len(action)):
            tmp = np.zeros((1, len(MOVES)), dtype=np.float32)
            tmp[0][action[i]] = 1.0
            action[i] = tmp.T

        state_1 = tf.constant([replay[0] for replay in replays], dtype=tf.float32)
        action = tf.constant(action, dtype=tf.float32)
        state_2 = tf.constant([replay[2] for replay in replays], dtype=tf.float32)
        reward = tf.constant([replay[3] for replay in replays], dtype=tf.float32)

        # Calculates Q values of the first state
        state_1_q = feed_network(state_1, network)

        # makes a selection matrix for state_1
        state_1_max = tf.matmul(state_1_q, action)[:, 0, 0]

        # gets the Q value of the selected action
        state_2_q = feed_network(state_2, target)

        state_2_max = tf.argmax(state_2_q, axis=2)

        predicted_q = state_1_max

        target_q = tf.add(reward, tf.cast(state_2_max[:, 0], dtype=tf.float32))
        loss = tf.square(target_q - predicted_q)

        opt = tf.keras.optimizers.SGD(learning_rate=lr_schedule)

        gradients = tape.gradient(loss, trainable_variables)

        opt.apply_gradients(zip(gradients, trainable_variables))


# Storing and Retrieving State

In [11]:
def store_network(network):
    data = [
        {
            'W': W.numpy().tolist(),
            'B': B.numpy().tolist()
        }
        for (W,B) in network
    ]
    return data

def restore_network(serialized):
    return [
        (
            tf.Variable(A['W']),
            tf.Variable(A['B'])
        )
        for A in serialized
    ]

def load_json(name):
    with open(name) as f:
        return json.load(f)

def save_json(name,data):
    with open(name,'w') as f:
        f.write(json.dumps(data,indent=2))
    


In [12]:
# # OK, LETS SEE...

# network = random_network([10, 10])
# target = copy_network(network)
# Attempts to restore the network from a file

network = random_network([10,15])

if os.path.exists('./network.json'):
    network = restore_network(load_json('./network.json'))

In [15]:
batch_sample_size = 1_000
batch_size = batch_sample_size * 5
batch_count = 1_000
target_update_interval = 5

random = Random()

lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
        initial_learning_rate=1e-2, decay_steps=batch_count, decay_rate=0.9
    )

network = random_network([10,15])

if os.path.exists('./network.json'):
    network = restore_network(load_json('./network.json'))

save_json('./network.json',store_network(network))

# Learning rate is a number between 0 and 1. It will be updated based on how accurate the network currently is. 
# For example, if the network accuracy is close to -1, then it will have a high learning rate in the next iteration, and vice versa
learning_rate = 0.01

for i in range(batch_count):
    if i % target_update_interval == 0:
        target = copy_network(network)
    replay = create_replay(network, batch_size, epsilon=i / batch_count)
    replay_sample = random.sample(replay, batch_sample_size)
    DQN(network, target, replay_sample,lr_schedule=learning_rate)
    value = accuracy(network)

    # Update learning rate
    learning_rate = ((-1.0 * (value - 1) / 2) ** 2) / 100

    save_json('./network.json',store_network(network))
    print(f"Batch {i}: Accuracy: {value}, updating learning rate {learning_rate}")


Batch 0: Accuracy: -0.24074074074074073, updating learning rate 0.003848593964334705
Batch 1: Accuracy: -0.23629629629629625, updating learning rate 0.0038210713305898486
Batch 2: Accuracy: -0.238888888888889, updating learning rate 0.003837114197530864
Batch 3: Accuracy: -0.2522222222222222, updating learning rate 0.003920151234567901
Batch 4: Accuracy: -0.23629629629629637, updating learning rate 0.0038210713305898503
Batch 5: Accuracy: -0.2396296296296296, updating learning rate 0.0038417040466392316
Batch 6: Accuracy: -0.24888888888888888, updating learning rate 0.0038993086419753086
Batch 7: Accuracy: -0.25037037037037035, updating learning rate 0.003908565157750342
Batch 8: Accuracy: -0.247037037037037, updating learning rate 0.003887753429355281
Batch 9: Accuracy: -0.24666666666666662, updating learning rate 0.003885444444444444
Batch 10: Accuracy: -0.23740740740740743, updating learning rate 0.0038279427297668047
Batch 11: Accuracy: -0.22555555555555554, updating learning rate 

KeyboardInterrupt: 