In [None]:
!pip install numpy
!pip install tensorflow
!pip install keras
!pip install tqdm
!pip install pillow
!pip install opencv-python-headless
!pip install matplotlib
!pip install pandas

In [None]:
import numpy as np
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Conv2D, MaxPooling2D, Activation, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import TensorBoard
from collections import deque
import time
import random
from tqdm import tqdm
import os
from PIL import Image
import cv2
import signal
import sys
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from google.colab.patches import cv2_imshow
import shutil
import math

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
DISCOUNT = 0.99
REPLAY_MEMORY_SIZE = 20_000  # How many last steps to keep for model training
MIN_REPLAY_MEMORY_SIZE = 512  # Minimum number of steps in a memory to start training
MINIBATCH_SIZE = 32  # How many steps (samples) to use for training
UPDATE_TARGET_EVERY = 10  # Terminal states (end of episodes)
MODEL_NAME = '2x256'
MIN_REWARD = 200
MAX_STEP = 12 # For model save
MEMORY_FRACTION = 0.20

# Environment settings
EPISODES = 800

# Exploration settings
epsilon = 1  # not a constant, going to be decayed
EPSILON_DECAY = 0.98 # gpt says it would take 341 episodes
MIN_EPSILON = 0.001

#  Stats settings
AGGREGATE_STATS_EVERY = 20  # episodes
SHOW_PREVIEW = False

In [None]:
def get_direction_point(direction):
  if direction == (1, 1):
      direction_point = 0
  elif direction == (1, 0):
      direction_point = 1
  elif direction == (1, -1):
      direction_point = 2
  elif direction == (0, 1):
      direction_point = 3
  elif direction == (0, -1):
      direction_point = 4
  elif direction == (-1, 1):
      direction_point = 5
  elif direction == (-1, 0):
      direction_point = 6
  elif direction == (-1, -1):
      direction_point = 7
  return direction_point

def get_direction(direction_point):
    if direction_point == 0:
        direction = (1, 1)
    elif direction_point == 1:
        direction = (1, 0)
    elif direction_point == 2:
        direction = (1, -1)
    elif direction_point == 3:
        direction = (0, 1)
    elif direction_point == 4:
        direction = (0, -1)
    elif direction_point == 5:
        direction = (-1, 1)
    elif direction_point == 6:
        direction = (-1, 0)
    elif direction_point == 7:
        direction = (-1, -1)
    else:
        raise ValueError("Invalid direction_point")
    return direction

def plot_model(df):
  # Plot the metrics
  plt.figure(figsize=(12, 8))

  # Average Reward
  plt.subplot(2, 2, 1)
  sns.lineplot(x='Episode', y='Average Reward', data=df, marker='o')
  plt.title('Average Reward per Episode')

  # Min Reward
  plt.subplot(2, 2, 2)
  sns.lineplot(x='Episode', y='Min Reward', data=df, marker='o', color='red')
  plt.title('Min Reward per Episode')

  # Max Reward
  plt.subplot(2, 2, 3)
  sns.lineplot(x='Episode', y='Max Reward', data=df, marker='o', color='green')
  plt.title('Max Reward per Episode')

  # Max Step
  plt.subplot(2, 2, 4)
  sns.lineplot(x='Episode', y='Max Step', data=df, marker='o', color='purple')
  plt.title('Max Step per Episode')

  plt.tight_layout()
  plt.savefig(f'trained_metrics_plot.png')
  plt.close("all")

def draw_locking(img, coords):
  cv2.rectangle(
            img,
            (coords[0], coords[1]),
            (coords[0] + 20, coords[1] + 20),
            (0, 255, 0),  # BGR color for green
            thickness = 8
        )
  return img

def calculate_distance(coord1, coord2):
    x1, y1 = coord1
    x2, y2 = coord2
    distance = math.sqrt((x2 - x1)**2 + (y2 - y1)**2)
    return distance

def save_video(imgs, episode):
  frame_height, frame_width, layers = imgs[0].shape
  video = cv2.VideoWriter(f'video{episode}.avi', cv2.VideoWriter_fourcc(*'XVID'), 3, (frame_width, frame_height))

  # Write each image to the video
  for img in imgs:
    video.write(img)

  # Release the VideoWriter
  video.release()
  shutil.move(f'video{episode}.avi', f'/content/drive/My Drive/video{episode}.avi')

In [None]:
class ImageGenerator():
  def __init__(self, height=256, width=256, speed=10, side_length=50):
    self.height = height
    self.width = width
    self.speed = speed
    self.side_length = side_length
    low_noise = (np.random.rand(height, width, 3) * 255/ 10).astype(np.uint8)
    high_noise = ((np.random.rand(height, width, 3) * 255/ 10) + 230 ).astype(np.uint8)
    self.img = low_noise + high_noise
    self.pos = [np.random.randint(0, height), np.random.randint(0, width)]
    self.direction = (0, 0)
    self.direction_point = 0
    self.direction = self.get_random_direction()

  def get_random_direction(self):
    direction = (np.random.randint(-1, 2), np.random.randint(-1, 2))
    if (direction != (0, 0)) and (self.direction != direction):
      return direction
    else:
      return self.get_random_direction()

  def move(self):
    posx = (self.pos[0] + self.direction[0] * self.speed)
    change_direction = False
    if posx <= (0 + self.side_length):
      posx = (0 + self.side_length)
      change_direction = True
    if posx >= (self.height - self.side_length):
      posx = (self.height - self.side_length) - 1
      change_direction = True

    posy = (self.pos[1] + self.direction[1] * self.speed)
    if posy <= (0 + self.side_length):
      posy = (0 + self.side_length)
      change_direction = True
    if posy >= (self.width - self.side_length):
      posy = (self.width - 1 - self.side_length)
      change_direction = True

    self.pos = (posx, posy)
    if change_direction:
      self.direction = self.get_random_direction()


  def next(self):
    if np.random.rand(1) > 0.8:
      self.direction = self.get_random_direction()

    self.move()

    image_with_object = self.img.copy()

    cv2.rectangle(
                image_with_object,
                (self.pos[0], self.pos[1]),
                (self.pos[0] + self.side_length, self.pos[1] + self.side_length),
                (255, 0, 0),  # BGR color for red
                thickness=cv2.FILLED
            )

    return image_with_object, self.pos, self.direction

  def contains(self, x, y):
    return (self.pos[0] <= x <= self.pos[0] + self.side_length) and (self.pos[1] <= y <= self.pos[1] + self.side_length)

  def __call__(self):
    return self.next()

  def __next__(self):
    return self.next()


In [None]:
class DQNAgent:
    def __init__(self):
        columns = ['Model', 'Episode', 'Average Reward', 'Min Reward', 'Max Reward', 'Max Step']
        self.df = pd.DataFrame(columns=columns)

        # Main model
        self.model = self.create_model()

        # Target network
        self.target_model = self.create_model()
        self.target_model.set_weights(self.model.get_weights())

        # An array with last n steps for training
        self.replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE)

        # Used to count when to update target network with main network's weights
        self.target_update_counter = 0

    def create_model(self):
        model = Sequential()

        model.add(Conv2D(256, (3, 3), input_shape=(256,256,3)))
        model.add(Activation('relu'))
        model.add(MaxPooling2D(pool_size=(2, 2)))
        model.add(Dropout(0.2))

        model.add(Conv2D(256, (3, 3)))
        model.add(Activation('relu'))
        model.add(MaxPooling2D(pool_size=(2, 2)))
        model.add(Dropout(0.2))

        model.add(Flatten())  # this converts our 3D feature maps to 1D feature vectors
        model.add(Dense(64))

        model.add(Dense(8, activation='linear')) 
        model.compile(loss="mse", optimizer=Adam(learning_rate=0.001), metrics=['accuracy'])
        return model

    # Adds step's data to a memory replay array
    # (observation space, action, reward, new observation space, done)
    def update_replay_memory(self, transition):
        self.replay_memory.append(transition)

    # Trains main network every step during episode
    def train(self, terminal_state, step):

        # Start training only if certain number of samples is already saved
        if len(self.replay_memory) < MIN_REPLAY_MEMORY_SIZE:
            return

        # Get a minibatch of random samples from memory replay table
        minibatch = random.sample(self.replay_memory, MINIBATCH_SIZE)

        # Get current states from minibatch, then query NN model for Q values
        current_states = np.array([transition[0] for transition in minibatch])/255
        current_qs_list = self.model.predict(current_states)

        # Get future states from minibatch, then query NN model for Q values
        new_current_states = np.array([transition[3] for transition in minibatch])/255
        future_qs_list = self.target_model.predict(new_current_states)

        X = []
        y = []

        #enumerate our batches
        for index, (current_state, action, reward, new_current_state, done) in enumerate(minibatch):

            if not done:
                max_future_q = np.max(future_qs_list[index])
                new_q = reward + DISCOUNT * max_future_q
            else:
                new_q = reward

            # Update Q value for given state
            current_qs = current_qs_list[index]
            current_qs[action] = new_q

            # And append to our training data
            X.append(current_state)
            y.append(current_qs)

        # Fit on all samples as one batch, log only on terminal state
        self.model.fit(np.array(X)/255, np.array(y), batch_size=MINIBATCH_SIZE, verbose=0, shuffle=False)

        # Update target network counter every episode
        if terminal_state:
            self.target_update_counter += 1

        # If counter reaches set value, update target network with weights of main network
        if self.target_update_counter > UPDATE_TARGET_EVERY:
            self.target_model.set_weights(self.model.get_weights())
            self.target_update_counter = 0

    # Queries main network for Q values given current observation space (environment state)
    def get_qs(self, state):
      return self.model.predict(np.array(state).reshape(-1, *state.shape)/255)[0]

In [None]:
agent = DQNAgent()
image_generator = ImageGenerator()
steps = []
ep_rewards = []
success_list = []
df = pd.DataFrame()

# Iterate over episodes
for episode in tqdm(range(1, EPISODES + 1), ascii=True, unit='episodes'):
    # Restarting episode - reset episode reward and step number
    episode_reward = 0
    step = 0
    min_reward = 0
    total_x = 0
    total_y = 0
    count_down = 0
    former_pos = [0, 0]
    imgs = []

    # Reset environment and get initial state
    current_state, target_pos, blob_direction = next(image_generator)
    locking_pos = [np.random.randint(0, 256), np.random.randint(0, 256)]
    current_state = draw_locking(current_state, locking_pos)

    # Reset flag and start iterating until episode ends
    done = False
    success = False
    while not done:
      if np.random.random() > epsilon:
        # Get action from Q table
        action = np.argmax(agent.get_qs(current_state))
      else:
        # Get random action
        action = np.random.randint(0, 8)

      direction = get_direction(action)

      former_pos[0] = locking_pos[0]
      former_pos[1] = locking_pos[1]

      locking_pos[0] += direction[0] * 10
      locking_pos[1] += direction[1] * 10

      distance = calculate_distance(locking_pos, target_pos)
      former_distance = calculate_distance(former_pos, target_pos)

      if image_generator.contains(locking_pos[0], locking_pos[1]) and image_generator.contains(locking_pos[0] + 20, locking_pos[1] + 20) :
        reward = 1
        count_down += 1
      elif distance > former_distance :
        reward = -1/2
        count_down = 0
      elif distance < former_distance :
        reward = 1/2
        count_down = 0
      elif distance == former_distance :
        reward = 0
        count_down = 0

      if count_down == 5:
        done = True
        success = True

      if not 226 >= locking_pos[0] >= 0 and not 226 >= locking_pos[1] >= 0:
        done = True
        reward = -5

      if step > 30:
        done = True

      # Transform new continous state to new discrete state and count reward
      episode_reward += reward

      # Every step we update replay memory and train main network
      new_state, target_pos, blob_direcion = next(image_generator)
      new_state = draw_locking(new_state, locking_pos)
      agent.update_replay_memory((current_state, action, reward, new_state, done))
      agent.train(done, step)

      current_state = new_state
      step += 1

    ep_rewards.append(episode_reward)
    steps.append(step)
    success_list.append(success)

    if not episode % AGGREGATE_STATS_EVERY or episode == 1:
      average_reward = sum(ep_rewards[-AGGREGATE_STATS_EVERY:])/len(ep_rewards[-AGGREGATE_STATS_EVERY:])
      min_reward = min(ep_rewards[-AGGREGATE_STATS_EVERY:])
      max_reward = max(ep_rewards[-AGGREGATE_STATS_EVERY:])
      max_step = max(steps[-AGGREGATE_STATS_EVERY:])

      new_row = pd.DataFrame({
      'Episode': [episode],
      'Average Reward': [average_reward],
      'Min Reward': [min_reward],
      'Max Reward': [max_reward],
      'Max Step': [max_step]})

      df = pd.concat([df, new_row], ignore_index=True)
      df.to_csv(f'model_metrics.csv', index=False)
      plot_model(df)

      # Save model
      success = success_list[-1]
      if success or episode == 1 or not episode % 100:
          agent.model.save(f'models/{MODEL_NAME}__{max_reward:_>7.2f}max_{average_reward:_>7.2f}avg_{min_reward:_>7.2f}min__{episode}ep__.model')
          step = steps[-1]
          replay_memory_list = list(agent.replay_memory)
          for pack in replay_memory_list[-step:]:
              imgs.append(pack[0])
          save_video(imgs, episode)

    if epsilon > MIN_EPSILON:
      epsilon *= EPSILON_DECAY
      epsilon = max(MIN_EPSILON, epsilon)
