<a href="https://colab.research.google.com/github/epeay/random-number-guesser/blob/main/random_number_guesser.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install gym



In [27]:
from math import trunc
import gym
from gym import spaces
import numpy as np
from collections import deque

# The model responds 0 or 1. 1 is a guess that the number is > CHOSEN_X
HIGHER_THAN_X = 1
CHOSEN_X = 70
# The model is provided with a history of previous numbers.
# Only the last number is actually relevant. The others are red herrings.
history_size = 3

# Assume the chosen pivot value is 70 ("CHOSEN_X")
# Given
# [A, B, 60], the correct response is 0 (<= CHOSEN_X)
# [B, 60, 85], the correct response is 1 (> CHOSEN_X)
# [60, 85, 1], the correct response is 0 (<= CHOSEN_X)

class NumberPredictionEnv(gym.Env):
    def __init__(self):
        super(NumberPredictionEnv, self).__init__()
        # Observation space: integers between 0 and 99 of length `history_size`
        self.observation_space = spaces.Box(low=0, high=99, shape=(history_size,), dtype=np.int32)
        # Action space: Two actions, 0 (<= 70) and 1 (> 70)
        self.action_space = spaces.Discrete(2)
        self.current_number = None
        self.history = deque(maxlen=history_size)

    def reset(self):
      # Initialize the history with random values
      self.history = deque([np.random.randint(0, 100) for _ in range(history_size)], maxlen=history_size)
      self.current_number = self.history[-1]
      return np.array(self.history)

    def step(self, action):
        guess = ""
        if action == 1:
            guess = ">"
        else:
            guess = "<="

        # Determine if the action was correct
        if (action == 1 and self.current_number > CHOSEN_X) or (action == 0 and self.current_number <= CHOSEN_X):
            # print(f"CORRECT - Guess: {self.current_number}{guess}{CHOSEN_X}. History: {self.history}")
            reward = 1
        else:
            # print(f"WRONG   - Guess: {self.current_number}{guess}{CHOSEN_X}. History: {self.history}")
            reward = -1

        # Generate a new number and update history
        self.current_number = np.random.randint(0, 100)
        self.history.append(self.current_number)
        # print(f"History: {self.history}")

        # Because the model's actions can be taken "in a bubble", where the next
        # state is not dependent on the previous, we're always, "done". We don't
        # want the model replay (bellman equation) to evaluate future states,
        # as if there's some correlation.
        done = True

        return np.array(self.history), reward, done, {}

    def render(self, mode='human'):
        pass  # Rendering is not necessary for this simple task

# Create the custom environment
env = NumberPredictionEnv()

################################################################

import tensorflow as tf
from collections import deque
import random
import os
import datetime
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense

# Define the model
def build_model(input_shape, action_size):
    model = Sequential([
        Dense(24, input_dim=input_shape, activation='relu'),  # First hidden layer with 24 neurons
        Dense(24, activation='relu'),  # Second hidden layer with 24 neurons
        Dense(action_size, activation='linear')  # Output layer with neurons equal to action size
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='mse')
    return model

class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95  # Discount factor
        self.epsilon = 1.0  # Exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.model = build_model(state_size, action_size)
        self.fit_check = 0

        self.log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
        self.tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=self.log_dir, histogram_freq=1)

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)

        act_values = self.model.predict(state)
        return np.argmax(act_values[0])

    def predict(self, state):
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])

    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward

            # Bellman equation bits
            # If this wasn't the last step in its episode, consider the
            # outcomes of the *next* step, to help with choosing the best
            # outcome, for *this* step. In other words, this is like asking,
            # "Did the choice in *this* step, help set us up for success in
            # the *next* step?"
            if not done:
                target = reward + self.gamma * np.amax(self.model.predict(next_state)[0])
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0, callbacks=[self.tensorboard_callback])
            self.fit_check += 1

        # Decay epsilon, no lower than epsilon_min
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
        if self.epsilon < self.epsilon_min:
            self.epsilon = self.epsilon_min

# Initialize the agent
state_size = history_size
action_size = 2  # Two actions: 0 (<= 70) and 1 (> 70)
agent = DQNAgent(state_size, action_size)

################################################################

# Parameters
episodes = 30
steps_per_episode = 1
replay_batch_size = 32
# Wait til there's > 64 steps recorded before replaying 32 of them.
replay_memory_min_length = 500
replay_chance = 0.1


def train(episodes, steps_per_episode, batch_size, state_size):
    # Training loop
    for e in range(episodes):
        # Get a state
        state = env.reset()
        # Format the state for the model
        state = np.reshape(state, [1, state_size])

        for step in range(steps_per_episode):
            # print(f"Episode {e}, Step {step}")
            action = agent.act(state)
            # print(state)
            # print(f"State: {state[0]}, Action: {action}")
            next_state, reward, done, _ = env.step(action)
            next_state = np.reshape(next_state, [1, state_size])
            agent.remember(state, action, reward, next_state, done)
            state = next_state

        # Run replay at the end of each episode
        if len(agent.memory) > replay_memory_min_length and np.random.rand() < replay_chance:
            print(f"Agent memory: {len(agent.memory)}")
            agent.replay(batch_size)
        else:
            print("Skipping replay")

def test(episodes):

    correct = 0
    incorrect = 0

    for e in range(episodes):
        # Get a state
        state = env.reset()
        # Format the state for the model
        state = np.reshape(state, [1, state_size])

        for step in range(steps_per_episode):
            # print(f"Episode {e}, Step {step}")
            action = agent.predict(state)
            next_state, reward, done, _ = env.step(action)

            if reward == 1:
                correct += 1
            else:
                incorrect += 1

            next_state = np.reshape(next_state, [1, state_size])
            agent.remember(state, action, reward, next_state, done)
            state = next_state

            if done:
                break

    print(f"Correct: {correct}, Incorrect: {incorrect}")
    return correct, incorrect

################################################################

results = []
train_test_loops = 10

for _ in range(train_test_loops):
  train(episodes, steps_per_episode, replay_batch_size, state_size)
  correct, incorrect = test(episodes)
  agent_memory = len(agent.memory)
  results.append((correct, incorrect, agent_memory, agent.fit_check))
  for i, o in enumerate(results):

      right_pct = trunc((o[0] / (o[0] + o[1])) * 10000) / 100
      print(f"Iteration {i+1}: Pct/Right/Wrong:{right_pct}%/{o[0]}/{o[1]}, Agent Memory: {o[2]}, Fits: {o[3]}")


Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Correct: 19, Incorrect: 11
Iteration 1: Pct/Right/Wrong:63.33%/19/11, Agent Memory: 60, Fits: 0
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping

In [None]:
%load_ext tensorboard
%tensorboard --logdir logs/fit

In [29]:

for _ in range(train_test_loops):
  train(episodes, steps_per_episode, replay_batch_size, state_size)
  correct, incorrect = test(episodes)
  agent_memory = len(agent.memory)
  results.append((correct, incorrect, agent_memory, agent.fit_check))
  for i, o in enumerate(results):

      right_pct = trunc((o[0] / (o[0] + o[1])) * 10000) / 100
      print(f"Iteration {i+1}: Pct/Right/Wrong:{right_pct}%/{o[0]}/{o[1]}, Agent Memory: {o[2]}, Fits: {o[3]}")


Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Agent memory: 621
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Skipping replay
Correct: 21, Incorrect: 9
Iteration 1: Pct/Right/Wrong:63.33%/19/11, Agent Memory: 60, Fits: 0
Iteration 2: Pct/Right/Wrong:86.66%/26/4, Agent Memory: 120, Fits: 0
Iteration 3: Pct/Right/Wrong:83.33%/25/5, Agent Memory: 180, Fits: 0
Iteration 4: Pct/Right/Wrong:76.66%/23/7, Agent Memory: 240, Fits: 0
Iteration 5: Pct/Right/Wrong:63.33%/19/11, Agent Memory: 300, Fits: 0
Iteration 6: Pct/Right/Wrong:80.0%/24/6, Agent Memory: 360, Fits: 0
Iteration 7: Pct/Right/Wrong:63.33%/19/11, Agent Memory: 420, Fits: 0
Iteratio