In [1]:
import random
import numpy as np
import os
from PIL import Image
import cv2
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam

# Create Environment

In [2]:
SIZE_Y = 40*2
SIZE_X = 60*2

#Define Functions and Classes

def createLine(walls, SIZE_Y, SIZE_X):
    #Full Horizontal Line
    # for i in range(SIZE_X):
    #     walls[int(SIZE_Y/2),int(i)] = 1
        
    #Full Vertical Line
    # for i in range(SIZE_Y):
        # walls[int(i), int(SIZE_X/2)] = 1

    #Segment 1
    for i in range(SIZE_Y//10, SIZE_Y//3):
        walls[i, SIZE_X//2] = 1

    #Segment 2
    for i in range(SIZE_X//2, SIZE_X//2+SIZE_X//6):
        walls[SIZE_Y//10, i] = 1

    #Segment 3
    for i in range(SIZE_Y//10, SIZE_Y//4):
        walls[i, SIZE_X//2+SIZE_X//6] = 1

    #Segment 4
    for i in range(SIZE_X//2-SIZE_X//6, SIZE_X//2+1):
        walls[SIZE_Y//3, i] = 1

    #Segment 5
    for i in range(SIZE_Y//3, SIZE_Y//3+SIZE_Y//4):
        walls[i, SIZE_X//2-SIZE_X//6] = 1

    #Segment 6
    for i in range(SIZE_X//2-SIZE_X//6, SIZE_X//2+SIZE_X//6):
        walls[SIZE_Y//3+SIZE_Y//4, i] = 1
        
    #Segment 7
    for i in range(SIZE_Y//3+SIZE_Y//4, SIZE_Y//3+SIZE_Y//4+SIZE_Y//5):
        walls[i, SIZE_X//2+SIZE_X//6] = 1

    #Segment 8
    for i in range(SIZE_X//2+SIZE_X//6, SIZE_X//2+SIZE_X//6+SIZE_X//6):
        walls[SIZE_Y//3+SIZE_Y//4+SIZE_Y//5, i] = 1

    #Segment 9
    for i in range(SIZE_Y-SIZE_Y//3, SIZE_Y):
        walls[i, SIZE_X//2] = 1
        
    #Segment 10
    for i in range(SIZE_Y-SIZE_Y//4, SIZE_Y):
        walls[i, SIZE_X//2-SIZE_X//4] = 1

    #Segment 11
    for i in range(0, SIZE_X//6):
        walls[SIZE_Y//5, i] = 1

    #Segment 12
    for i in range(SIZE_Y//5, SIZE_Y//5+SIZE_Y//5):
        walls[i, SIZE_X//6] = 1

    #Segment 13
    for i in range(0, SIZE_X//6):
        walls[SIZE_Y//5+SIZE_Y//3, i] = 1
        
    #Segment 14
    for i in range(0, SIZE_Y//3):
        walls[i, SIZE_X-SIZE_X//6] = 1

    #Segment 15
    for i in range(SIZE_X-SIZE_X//10, SIZE_X):
        walls[SIZE_Y//3, i] = 1

    #Segment 16
    for i in range(SIZE_X-SIZE_X//9, SIZE_X):
        walls[SIZE_Y//2+SIZE_Y//10, i] = 1

    #Segment 17
    for i in range(SIZE_Y//2+SIZE_Y//10, SIZE_Y//2+SIZE_Y//10+SIZE_Y//4):
        walls[i, SIZE_X-SIZE_X//9] = 1
        
    #Segment 18
    for i in range(SIZE_Y//2+SIZE_Y//10+SIZE_Y//3, SIZE_Y):
        walls[i, SIZE_X-SIZE_X//9] = 1
    
    return walls

class EnvObject:
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def __str__(self):
        return f"{self.x}, {self.y}"
    
    def __sub__(self, other):
        return (self.x - other.x, self.y - other.y)
    
    def __eq__(self, other):
        return self.x == other.x and self.y == other.y
    
class Drone(EnvObject):
    def __init__(self, x, y):
        super().__init__(x, y)
        
    def action(self, choice):
        '''
        Gives us 9 total movement options. (0,1,2,3,4,5,6,7,8)
        '''
        if choice == 0:
            self.move(x=1, y=1)
        elif choice == 1:
            self.move(x=-1, y=-1)
        elif choice == 2:
            self.move(x=-1, y=1)
        elif choice == 3:
            self.move(x=1, y=-1)

        elif choice == 4:
            self.move(x=1, y=0)
        elif choice == 5:
            self.move(x=-1, y=0)

        elif choice == 6:
            self.move(x=0, y=1)
        elif choice == 7:
            self.move(x=0, y=-1)

        elif choice == 8:
            self.move(x=0, y=0)

    def move(self, x=False, y=False):

        # If no value for x, move randomly
        if not x:
            self.x += np.random.randint(-1, 2)
        else:
            self.x += x

        # If no value for y, move randomly
        if not y:
            self.y += np.random.randint(-1, 2)
        else:
            self.y += y


        # If we are out of bounds, fix!
        if self.x < 0:
            self.x = 0
        elif self.x > SIZE_X-1:
            self.x = SIZE_X-1
        if self.y < 0:
            self.y = 0
        elif self.y > SIZE_Y-1:
            self.y = SIZE_Y-1

class Target(EnvObject):
    def __init__(self, x, y):
        super().__init__(x, y)

class DroneEnv:
    #Define Parameters
    SIZE_Y = 40*2
    SIZE_X = 60*2
    ENV_COLOR = (20, 52, 89)
    WALLS_COLOR = (77, 77, 234)
    DRONE_COLOR = (234, 222, 53)
    TARGET_COLOR = (132, 234, 53)
    space = np.zeros((SIZE_Y, SIZE_X, 3), dtype=np.uint8) 
    walls = np.zeros((SIZE_Y, SIZE_X), dtype=np.uint8)
    walls = createLine(walls, SIZE_Y, SIZE_X)


    def initial(self):
        self.agent_1 = Drone(self.SIZE_X-self.SIZE_X//11, self.SIZE_Y//10)
        self.agent_2 = Drone(self.SIZE_X-self.SIZE_X//20, self.SIZE_Y//2-self.SIZE_Y//20)
        self.agent_3 = Drone(self.SIZE_X-self.SIZE_X//13, self.SIZE_Y-self.SIZE_Y//11)
        self.target = Target(self.SIZE_X//10, self.SIZE_Y//2-self.SIZE_Y//20)
        
        self.episode_step = 0
        # observation = (self.agent_1.x - self.target.x) + (self.agent_1.y - self.target.y) + np.sqrt((self.agent_1.x - self.target.x)**2 + (self.agent_1.y - self.target.y)**2)
        observation = np.array([self.agent_1.x - self.target.x, self.agent_1.y - self.target.y, np.sqrt((self.agent_1.x - self.target.x)**2 + (self.agent_1.y - self.target.y)**2)])
        
        return observation
    
    def step(self, action, observation):
        self.episode_step += 1
        self.agent_1.action(action)
        
        new_observation = np.array([self.agent_1.x - self.target.x, self.agent_1.y - self.target.y, np.sqrt((self.agent_1.x - self.target.x)**2 + (self.agent_1.y - self.target.y)**2)])
        
        if self.agent_1 == self.target:
            reward = 100
            done = True
        elif self.episode_step >= 200:
            reward = -10
            done = True
        elif new_observation[2] < observation[2]:
            reward = 1
            done = False
        elif new_observation[2] > observation[2]:
            reward = -1
            done = False
            
        return new_observation, reward, done
        
    def is_wall(self, action):
            if self.walls[self.agent_1.y+action][self.agent_1.x+action] == 1:
                return True
    
    def visualize(self):
        for i in range(self.SIZE_Y):
            for j in range(self.SIZE_X):
                if self.walls[i][j] == 1:
                    self.space[i][j] = self.WALLS_COLOR
                else:
                    self.space[i][j] = self.ENV_COLOR

        self.space[self.agent_1.y][self.agent_1.x] = self.DRONE_COLOR
        self.space[self.target.y][self.target.x] = self.TARGET_COLOR
        
    def render(self):
        img = Image.fromarray(self.space, 'RGB')
        img = img.resize((1200, 800), resample = Image.Resampling.BOX)
        cv2.imshow("image", np.array(img))  # show it!
        cv2.waitKey(0)
        cv2.destroyAllWindows()

#crete the environment
env = DroneEnv()

In [3]:
print(env.walls)

[[0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 ...
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]]


In [4]:
print(env.initial())
observation = env.initial()
next_state, reward, done = env.step(1, observation)
print(next_state, reward, done)
env.visualize()
env.render()

[ 98.         -28.         101.92153845]
[ 97.         -29.         101.24228366] 1 False


# NN Model

In [5]:

class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95   # discount rate
        
        self.epsilon = 1.0  # exploration rate, 100% exploration at the beginning
        self.epsilon_decay = 0.995
        
        self.epsilon_min = 0.01 # 1% exploration at the end
        self.learning_rate = 0.001
        
        self.model = self.build_model()
    
    def build_model(self):
        # Neural Net for Deep-Q learning Model
        model = Sequential()
        model.add(Dense(24, input_dim=self.state_size, activation='relu'))
        model.add(Dense(24, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=Adam(learning_rate=self.learning_rate))
        return model
    
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    
    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size) #explore
        act_values = self.model.predict(state)
        return np.argmax(act_values[0]) #exploit
    
    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = reward + self.gamma*np.amax(self.model.predict(next_state)[0])    
            target_f = self.model.predict(state)
            target_f[0][action] = target
            
            self.model.fit(state, target_f, epochs=1, verbose=0)
        
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
    
    def load(self, name):
        self.model.load_weights(name)
        
    def save(self, name):
        self.model.save_weights(name)
        
    

# Training

In [6]:

# define parameters
epsilon = 1.0
epsilon_min = 0.01
alpha = 0.5
alpha_min = 0.05
decay = 0.00001
state_size = 3
action_size = 9
batch_size = 32
EPISODES = 1000
SHOW_EVERY = 50
output_dir = 'model_output/uav-rl/'

# ===================== Main Flow =====================
# Repeat for each episode:
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
    
agent = DQNAgent(state_size, action_size)
for episode in range(EPISODES):
    episode_reward = 0
    step = 1
    
    # 1. start from initial location and obtain associated state of that 
    # particular location by sensing the RSS
    current_state = env.initial()

    # 2. Repeat for each step:
    done = False
    while not done:
        # 7. select action based on epsilon greedy policy
        # if np.random.rand() <= epsilon:
        #     action = np.random.randint(0, 4)        # exploration
        # else:
        #     action = model.predict(current_state)
            # action = 1  #np argmax                  # exploitation
        
        action = agent.act(current_state)
        
        # 8. take action a, observe reward r and new state s'
        is_wall = env.is_wall(action) #kalau ada wall bakal true
        while is_wall: #selama True (ada wall), cari action lain
            action = np.random.randint(0, 8)
            is_wall = env.is_wall(action) #kalau false, berarti udah gak ada wall, lanjut next step

        next_state, reward, done = env.step(action, current_state)
        
        next_state = np.reshape(current_state, [1, state_size])
        
        agent.remember(current_state, action, reward, next_state, done)
        
        if SHOW_EVERY > 0 and episode % SHOW_EVERY == 0:
            env.render()
        
        current_state = next_state
        step += 1
        
        if len(agent.memory) > batch_size:
            agent.replay(batch_size )
        
        if episode % 50 == 0:
            agent.save(output_dir + "weights_" + '{:04d}'.format(episode) + '.hdf5' )
            

    
    if epsilon >= epsilon_min:
        epsilon = epsilon*np.exp(-decay)
    
    if alpha >= alpha_min:
        alpha = alpha*np.exp(-decay)

KeyboardInterrupt: 