#### ENV

In [56]:
import numpy as np
from PIL import Image
import cv2
import os
import PIL
import pandas as pd

SIZE_Y = 40
SIZE_X = 60
ACTION_SIZE = 8
MAX_STEP = 2500
# Define Functions and Classes


def createLine(walls, SIZE_Y, SIZE_X):
    # Full Horizontal Line
    # for i in range(SIZE_X):
    #     walls[int(SIZE_Y/2),int(i)] = 1

    # Full Vertical Line
    # for i in range(SIZE_Y):
    # walls[int(i), int(SIZE_X/2)] = 1

    # Segment 1
    for i in range(SIZE_Y//10, SIZE_Y//3):
        walls[i, SIZE_X//2] = 1

    # Segment 2
    for i in range(SIZE_X//2, SIZE_X//2+SIZE_X//6):
        walls[SIZE_Y//10, i] = 1

    # Segment 3
    for i in range(SIZE_Y//10, SIZE_Y//4):
        walls[i, SIZE_X//2+SIZE_X//6] = 1

    # Segment 4
    for i in range(SIZE_X//2-SIZE_X//6, SIZE_X//2+1):
        walls[SIZE_Y//3, i] = 1

    # Segment 5
    for i in range(SIZE_Y//3, SIZE_Y//3+SIZE_Y//4):
        walls[i, SIZE_X//2-SIZE_X//6] = 1

    # Segment 6
    for i in range(SIZE_X//2-SIZE_X//6, SIZE_X//2+SIZE_X//6):
        walls[SIZE_Y//3+SIZE_Y//4, i] = 1

    # Segment 7
    for i in range(SIZE_Y//3+SIZE_Y//4, SIZE_Y//3+SIZE_Y//4+SIZE_Y//5):
        walls[i, SIZE_X//2+SIZE_X//6] = 1

    # Segment 8
    for i in range(SIZE_X//2+SIZE_X//6, SIZE_X//2+SIZE_X//6+SIZE_X//6):
        walls[SIZE_Y//3+SIZE_Y//4+SIZE_Y//5, i] = 1

    # Segment 9
    for i in range(SIZE_Y-SIZE_Y//3, SIZE_Y):
        walls[i, SIZE_X//2] = 1

    # Segment 10
    for i in range(SIZE_Y-SIZE_Y//4, SIZE_Y):
        walls[i, SIZE_X//2-SIZE_X//4] = 1

    # Segment 11
    for i in range(0, SIZE_X//6):
        walls[SIZE_Y//5, i] = 1

    # Segment 12
    for i in range(SIZE_Y//5, SIZE_Y//5+SIZE_Y//5):
        walls[i, SIZE_X//6] = 1

    # Segment 13
    for i in range(0, SIZE_X//6):
        walls[SIZE_Y//5+SIZE_Y//3, i] = 1

    # Segment 14
    for i in range(0, SIZE_Y//3):
        walls[i, SIZE_X-SIZE_X//6] = 1

    # Segment 15
    for i in range(SIZE_X-SIZE_X//10, SIZE_X):
        walls[SIZE_Y//3, i] = 1

    # Segment 16
    for i in range(SIZE_X-SIZE_X//9, SIZE_X):
        walls[SIZE_Y//2+SIZE_Y//10, i] = 1

    # Segment 17
    for i in range(SIZE_Y//2+SIZE_Y//10, SIZE_Y//2+SIZE_Y//10+SIZE_Y//4):
        walls[i, SIZE_X-SIZE_X//9] = 1

    # Segment 18
    for i in range(SIZE_Y//2+SIZE_Y//10+SIZE_Y//3, SIZE_Y):
        walls[i, SIZE_X-SIZE_X//9] = 1

    return walls


class EnvObject:
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def __str__(self):
        return f"{self.x}, {self.y}"

    def __sub__(self, other):
        return (self.x - other.x, self.y - other.y)

    def __eq__(self, other):
        return self.x == other.x and self.y == other.y


class Drone(EnvObject):
    def __init__(self, x, y):
        super().__init__(x, y)

    def action(self, choice, walls = None):
        '''
        Gives us 9 total movement options. (0,1,2,3,4,5,6,7,8)
        '''
        # if choice == 0:
        #     x = 0
        #     y = 0
            
        if choice == 0:
            x=-1
            y=-1
        elif choice == 1:
            x=-1
            y=1
        elif choice == 2:
            x=1
            y=-1
        elif choice == 3:
            x=1
            y=1
            
        elif choice == 4:
            x=1
            y=0
        elif choice == 5:
            x=-1
            y=0
            
        elif choice == 6:
            x=0
            y=1
        elif choice == 7:
            x=0
            y=-1
        
        self.move(x=x, y=y, walls=walls)
            

    def move(self, x=0, y=0, walls = None):
        #if no value for x or y, stay
        
        x,y = self.collisionCheck(x,y,walls)
        self.x += x
        self.y += y

            
    def collisionCheck(self, x=0, y=0, walls=None):
        #checking for out of bounds
        predict_x = self.x + x
        predict_y = self.y + y
        if predict_x < 0 or predict_x > SIZE_X-1:
            x = 0
        if predict_y < 0 or predict_y > SIZE_Y-1:
            y = 0
            
        
        # Check for collision with walls    
        if walls is None:
            return x,y
        elif walls[self.y+y][self.x+x] == 1:
            return 0,0
        else:
            return x,y
        

class Target(EnvObject):
    def __init__(self, x, y):
        super().__init__(x, y)


class DroneEnv:
    # Define Parameters
    SIZE_X = SIZE_X
    SIZE_Y = SIZE_Y
    ENV_COLOR = (20, 52, 89)
    WALLS_COLOR = (77, 77, 234)
    DRONE_COLOR = (234, 222, 53)
    TARGET_COLOR = (132, 234, 53)
    space = np.zeros((SIZE_Y, SIZE_X, 3), dtype=np.uint8)
    walls = createLine(
        np.zeros((SIZE_Y, SIZE_X), dtype=np.uint8), SIZE_Y, SIZE_X)
    pathDist = pd.read_csv('scripts\path_dist\pathDist.csv', header=None, dtype='Int32').values
    
    def reset(self):
        self.agent_1 = Drone(self.SIZE_X-self.SIZE_X//11, self.SIZE_Y//10)
        self.agent_2 = Drone(self.SIZE_X-self.SIZE_X//20,
                             self.SIZE_Y//2-self.SIZE_Y//20)
        self.agent_3 = Drone(self.SIZE_X-self.SIZE_X//13,
                             self.SIZE_Y-self.SIZE_Y//11)
        self.target = Target(self.SIZE_X//10, self.SIZE_Y//2-self.SIZE_Y//20)

        self.episode_step = 0

        observation = (self.pathDist[self.agent_1.y][self.agent_1.x])
        return observation

    def step(self, action, observation):
        reward = 0
        done= False
        self.episode_step += 1
        self.agent_1.action(action, self.walls)

        new_observation = (self.pathDist[self.agent_1.y][self.agent_1.x])

        if self.agent_1 == self.target:
            reward = 100
            done = True
        elif self.episode_step >= MAX_STEP:
            reward = -10
            done = True
        elif new_observation < observation:
            reward = observation - new_observation - 1
            done = False
        elif new_observation > observation:
            reward = observation - new_observation - 1
            done = False
        return new_observation, reward, done

    def visualize(self):
        for i in range(self.SIZE_Y):
            for j in range(self.SIZE_X):
                if self.walls[i][j] == 1:
                    self.space[i][j] = self.WALLS_COLOR
                else:
                    self.space[i][j] = self.ENV_COLOR

        self.space[self.agent_1.y][self.agent_1.x] = self.DRONE_COLOR
        self.space[self.target.y][self.target.x] = self.TARGET_COLOR

    def render(self):
        self.visualize()
        img = Image.fromarray(self.space, 'RGB')
        # img = img.resize((1200, 800), resample=Image.Resampling.BOX)
        cv2.imshow("image", np.array(img))  # show it!
        cv2.waitKey(0)
        cv2.destroyAllWindows()

    def saveImage(self, image_path, episode='e', step='s'):
        self.visualize()
        if not os.path.isdir(image_path):
            os.makedirs(image_path)
        img = Image.fromarray(self.space, 'RGB')
        # img = img.resize((1200, 800), resample=Image.Resampling.BOX)
        img_rgb = img.convert('RGB')
        img_rgb = img_rgb.save(f'{image_path}/episode_{episode}/image_{episode}_{step}.png')
        

#### Program

In [57]:
env = DroneEnv()

In [58]:
env.reset()

56

In [59]:
EPISODES = 5
SAVE_EVERY = 10
epsilon = 0.99
EPSILON_DECAY = 0.99975
TARGET_REWARD = 100

LEARNING_RATE = 0.1
DISCOUNT = 0.95

In [60]:

if not os.path.isdir('images'):
    os.makedirs('images')
for episode in range(1, EPISODES+1):
    os.makedirs(f'images/episode_{episode}')

start_q_table = None  # if we have a pickled Q table, we'll put the filename of it here.

if start_q_table is None:
    q_table = {}
    for i in range(0, 144):
        q_table[(i)] = [np.random.uniform(-5, 0) for i in range(ACTION_SIZE)]
                
episode_rewards = []

for episode in range(1, EPISODES+1):
    agent_1 = Drone(SIZE_X-SIZE_X//11, SIZE_Y//10)
    target = Target(SIZE_X//10, SIZE_Y//2-SIZE_Y//20)
    episode_reward = 0
    
    # if episode % SAVE_EVERY == 0:
    #     show = True
    # else:
    #     show = False
    
    current_state = env.reset()
    
    done = False
    while not done:
        # a. Explore Exploit Tradeoff
        # b. Execute the action in the environment
        if np.random.random() > epsilon:
            # Get action from Q table
            action = np.argmax(q_table[current_state])
        else:
            # Get random action
            action = np.random.randint(0, ACTION_SIZE)
        
        new_state, reward, done = env.step(action, current_state)
        
        max_future_q = np.max(q_table[new_state])
        current_q = q_table[current_state][action]
        
        if reward == TARGET_REWARD:
            new_q = TARGET_REWARD
        else:
            new_q =  (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)
            
        q_table[current_state][action] = new_q

        episode_reward += reward
        
        if reward == TARGET_REWARD:
            print(f'Episode: {episode}, episode reward: {episode_reward}')
            break
        
        if env.episode_step % SAVE_EVERY == 0:
            env.saveImage('images', episode, env.episode_step)
            
        epsilon *= EPSILON_DECAY
        episode_rewards.append(episode_reward)
        
        
        