In [3]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

__file__ = '/content/drive/MyDrive/aiffelthon/data'


Mounted at /content/drive


## Environment

In [4]:
from string import ascii_uppercase
#from draw_utils import *
#from pyglet.gl import *
import numpy as np
import pandas as pd
import os
import random
from datetime import datetime
import pytz
import matplotlib.pyplot as plt

# reward
move_reward = -0.1
obs_reward = -0.1
goal_reward = 100

print('reward:' , move_reward, obs_reward, goal_reward)
#__file__ = '/home/ogangza/heung_path_finding/path-finding-rl/data'
local_path = os.path.abspath(os.path.join(os.path.dirname(__file__)))

class Simulator:
    def __init__(self):
        '''
        height : 그리드 높이
        width : 그리드 너비 
        inds : A ~ Q alphabet list
        '''
        # Load train data
        self.files = pd.read_csv(os.path.join(local_path, "data/factory_order_train.csv"))
        self.height = 10
        self.width = 9
        self.inds = list(ascii_uppercase)[:17]

    def set_box(self):
        '''
        아이템들이 있을 위치를 미리 정해놓고 그 위치 좌표들에 아이템이 들어올 수 있으므로 그리드에 100으로 표시한다.
        데이터 파일에서 이번 에피소드 아이템 정보를 받아 가져와야 할 아이템이 있는 좌표만 -100으로 표시한다.
        self.local_target에 에이전트가 이번에 방문해야할 좌표들을 저장한다.
        따라서 가져와야하는 아이템 좌표와 end point 좌표(처음 시작했던 좌표로 돌아와야하므로)가 들어가게 된다.
        '''
        box_data = pd.read_csv(os.path.join(local_path, "./data/box.csv"))
        ######################################
        #print('box data:', box_data)
        ######################################
        # 물건이 들어있을 수 있는 경우
        for box in box_data.itertuples(index = True, name ='Pandas'): #판다스 데이터프레임을 행단위로 반복 처리
            self.grid[getattr(box, "row")][getattr(box, "col")] = 100
            #######################################
            #print('self.grid:', self.grid)
            #######################################
        # 물건이 실제 들어있는 경우
        order_item = list(set(self.inds) & set(self.items))  # 에피소드에 해당하는 items는 reset 메서드에서 결정
        order_csv = box_data[box_data['item'].isin(order_item)]
        ########################################
        #print('order_csv:', order_csv)
        ########################################
        for order_box in order_csv.itertuples(index = True, name ='Pandas'):
            self.grid[getattr(order_box, "row")][getattr(order_box, "col")] = -100
            # local target에 가야 할 위치 좌표 넣기
            self.local_target.append(
                [getattr(order_box, "row"),
                 getattr(order_box, "col")]
                )

            #######################################
            #print('self.grid:', self.grid)
            #######################################

        ################################################
        #print('self.local_target.sort() 전:', self.local_target)
        ################################################
        #self.local_target.sort()   # 인풋 데이터 파일이 소팅되어 있으므로 그대로 사용. 2차원 배열 소팅이므로 A, B, C순으로 정렬 안됨
        self.local_target.append([9,4]) # 최종 목적지 (출발점) 추가
        ################################################
        #print('self.local_target.sort():', self.local_target)
        ###############################################
        # 알파벳을 Grid에 넣어서 -> grid에 2Dconv 적용 가능

    def set_obstacle(self):
        '''
        장애물이 있어야하는 위치는 미리 obstacles.csv에 정의되어 있다. 이 좌표들을 0으로 표시한다.
        '''
        obstacles_data = pd.read_csv(os.path.join(local_path, "./data/obstacles.csv"))
        for obstacle in obstacles_data.itertuples(index = True, name ='Pandas'):
            self.grid[getattr(obstacle, "row")][getattr(obstacle, "col")] = 0
            
        ##########################################
        #print('self.grid:', self.grid)
        ##########################################
        
    def reset(self, epi):
        '''
        reset()은 첫 스텝에서 사용되며 그리드에서 에이전트 위치가 start point에 있게 한다.
         - param epi: episode, 에피소드 마다 가져와야 할 아이템 리스트를 불러올 때 사용
         - return: 초기 셋팅된 그리드
         - rtype: numpy.ndarray
        _____________________________________________________________________________________
         - items: 이번 에피소드에서 가져와야 하는 아이템들
         - terminal_location: 현재 에이전트가 찾아가야 하는 목적지(item). 중간 경유지
         - local_target: 한 에피소드에서 찾아가야 하는 중간 경유지와 최종 목적지의 위치 (좌표)
         - actions: visualization을 위해 에이전트 action을 저장하는 리스트. 이동 좌표들의 리스트
         - curloc: 현재 위치
        '''
        # initial episode parameter setting
        self.epi = epi
        self.items = list(self.files.iloc[self.epi])[0]  # 해당 에피소드의 items를 가져 옴. 예, [ 'H', 'L', 'M']
        self.cumulative_reward = 0
        self.terminal_location = None
        self.local_target = []
        self.actions = []

        # initial grid setting
        self.grid = np.ones((self.height, self.width), dtype="float16")

        # set information about the gridworld
        self.set_box()  # 에이전트가 이번에 방문해야 할 좌표들 저장. 예, [ [0,3], [0,7], [0,8], [9,4] ]
        self.set_obstacle()

        # start point를 grid에 표시
        self.curloc = [9, 4]  # 에피소드의 출발점
        self.grid[int(self.curloc[0])][int(self.curloc[1])] = -5
        
        self.done = False
        return self.grid

    def apply_action(self, action, cur_x, cur_y):
        '''
        에이전트가 행한 action대로 현 에이전트의 위치좌표를 바꾼다.
        action은 discrete하며 4가지 up,down,left,right으로 정의된다.
        
        :param x: 에이전트의 현재 x 좌표
        :param y: 에이전트의 현재 y 좌표
        :return: action에 따라 변한 에이전트의 x 좌표, y 좌표
        :rtype: int, int
        '''
        new_x = cur_x
        new_y = cur_y
        # up
        if action == 0:
            new_x = cur_x - 1
        # down
        elif action == 1:
            new_x = cur_x + 1
        # left
        elif action == 2:
            new_y = cur_y - 1
        # right
        else:
            new_y = cur_y + 1

        return int(new_x), int(new_y)


    def get_reward(self, new_x, new_y, out_of_boundary):
        '''
        get_reward함수는 리워드를 계산하는 함수이며, 상황에 따라 에이전트가 action을 옳게 했는지 판단하는 지표가 된다.
         - new_x: action에 따른 에이전트 새로운 위치 좌표 x
         - new_y: action에 따른 에이전트 새로운 위치 좌표 y
         - out_of_boundary: 에이전트 위치가 그리드 밖이 되지 않도록 제한
         - return: action에 따른 리워드
         - rtype: float
        '''

        # 바깥으로 나가는 경우
        if any(out_of_boundary):
            reward = obs_reward
                       
        else:
            # 장애물에 부딪히는 경우 
            if self.grid[new_x][new_y] == 0:
                reward = obs_reward  

            # 현재 목표에 도달한 경우
            elif new_x == self.terminal_location[0] and new_y == self.terminal_location[1]:
                reward = goal_reward

            # 그냥 움직이는 경우 
            else:
                reward = move_reward

        return reward

    def step(self, action):
        ''' 
        에이전트의 action에 따라 step을 진행한다.
        action에 따라 에이전트 위치를 변환하고, action에 대해 리워드를 받고, 어느 상황에 에피소드가 종료되어야 하는지 등을 판단한다.
        에이전트가 endpoint에 도착하면 gif로 에피소드에서 에이전트의 행동이 저장된다.

         - action: 에이전트 행동
         - return: grid(그리드), reward(리워드), cumulative_reward(누적 리워드), done(최종 목적지 도달 여부),
             goal_ob_reward(goal까지 아이템을 모두 가지고 돌아오는 finish율 계산을 위한 파라미터)
         - rtype: numpy.ndarray, float, float, bool, bool/str
        (Hint: 시작 위치 (9,4)에서 up말고 다른 action은 전부 장애물이므로 action을 고정하는 것이 좋음)
        '''
        self.terminal_location = self.local_target[0]  # 현재의 목적지 확인 (또는 설정) 
        cur_x,cur_y = self.curloc  # 현재 위치

        self.actions.append((cur_x, cur_y))  # 현재 위치를 actions 리스트에 추가

        goal_ob_reward = False
        
        new_x, new_y = self.apply_action(action, cur_x, cur_y)  # 현 위치에서 인풋으로 받은 action 적용해서 새 위치 구함
        out_of_boundary = [new_x < 0, new_x >= self.height, new_y < 0, new_y >= self.width]  # OB 판정. OB면 True

        # 바깥으로 나가는 경우 종료
        if any(out_of_boundary):
            #원 위치
            new_x = cur_x
            new_y = cur_y
            #self.done = True
            goal_ob_reward = True
        else:
            # 장애물에 부딪히는 경우 종료
            if self.grid[new_x][new_y] == 0:
                #원 위치
                new_x = cur_x
                new_y = cur_y
                #self.done = True
                goal_ob_reward = True
            # 현재 목표에 도달한 경우, 다음 목표 설정
            elif new_x == self.terminal_location[0] and new_y == self.terminal_location[1]:
                # end point 일 때
                if [new_x, new_y] == [9,4]:
                    self.done = True
                # 현재 목표에 도달한 모든 경우에 대해
                self.local_target.remove(self.local_target[0])
                self.grid[cur_x][cur_y] = 1
                self.grid[new_x][new_y] = -5
                goal_ob_reward = True
                self.curloc = [new_x, new_y]
            # 그냥 움직이는 경우 
            else:
                self.grid[cur_x][cur_y] = 1
                self.grid[new_x][new_y] = -5
                self.curloc = [new_x,new_y]
                
        reward = self.get_reward(new_x, new_y, out_of_boundary)
        self.cumulative_reward += reward
        return self.grid, reward, self.cumulative_reward, self.done, goal_ob_reward



reward: -0.1 -0.1 100


## Agent

In [5]:
class QAgent():
    def __init__(self):
        self.height = 10
        self.width = 9
        self.q_table = np.zeros((self.height, self.width, 4)) # 마찬가지로 Q 테이블을 0으로 초기화
        self.eps = 0.9

    def select_action(self, s): 
        # eps-greedy로 액션을 선택해준다
        x, y = s
        '''
        # 출발점에서는 무조건 위로 올라간다 (action=0)
        if [x,y] == [9,4]:
            action = 0
            return action
        
        # 위쪽 Rack에 들어가면 항상 아래로 나온다 (action=1)
        upper_rack = [[0,0],[0,1],[0,2],[0,3],[0,4],[0,5],[0,6],[0,7],[0,8]]
        if [x,y] in upper_rack:
            action = 1
            return action
        
        # 왼쪽 Rack에 들어가면 항상 오른쪽으로 나온다 (action=3)
        left_rack = [[2,0],[3,0],[4,0],[5,0]]
        if [x,y] in left_rack:
            action = 3
            return action
        
        # 오른쪽 Rack에 들어가면 항상 왼쪽으로 나온다 (action=2)
        right_rack = [[2,8],[3,8],[4,8],[5,8]]
        if [x,y] in right_rack:
            action = 2
            return action
        '''
        coin = random.random()
        if coin < self.eps:
            action = random.randint(0,3)
        else:
            action_val = self.q_table[x,y,:]
            action = np.argmax(action_val)
        return action

    def update_table(self, transition):
        s, a, r, s_prime = transition
        x,y = s
        next_x, next_y = s_prime
        # Q러닝 업데이트 식을 이용 
        self.q_table[x,y,a] = self.q_table[x,y,a] + 0.1 * (r + np.amax(self.q_table[next_x,next_y,:]) - self.q_table[x,y,a])

    def anneal_eps(self):
        self.eps -= 0.01  # Q러닝에선 epsilon 이 좀더 천천히 줄어 들도록 함.
        self.eps = max(self.eps, 0.2) 

    def show_table(self):
        q_lst = self.q_table.tolist()
        data = np.zeros((self.height, self.width))
        for row_idx in range(len(q_lst)):
            row = q_lst[row_idx]
            for col_idx in range(len(row)):
                col = row[col_idx]
                action = np.argmax(col)
                data[row_idx, col_idx] = action
        print(data)
        
        #좌, 상, 우, 하 = 2, 0, 3, 1
        data_direction = np.where(data == 2., '←', data)
        data_direction = np.where(data == 0., '↑', data_direction)
        data_direction = np.where(data == 3., '→', data_direction)
        data_direction = np.where(data == 1., '↓', data_direction)

        data_direction[9,4] = 'S'
        data_direction[9,4] = 'G'
        
        # 맨 아래 줄
        for i in range(4):
            data_direction[9,i] = '■'
        for i in range(5,9):
            data_direction[9,i] = '■'
        
        # 맨 윗 줄
        upper_line = 'ⓔⓕⓖⓗⓘⓙⓚⓛⓜ'
        for i in range(9):
            data_direction[0,i] = upper_line[i]
        
        # 왼쪽, 오른쪽 줄
        left_line = 'ⓓⓒⓑⓐ'
        right_line = 'ⓝⓞⓟⓠ'
        for i in range(2,6):
            data_direction[i,0] = left_line[i-2]
            data_direction[i,8] = right_line[i-2]
            
        # 중간 장애물
        for i in range(3,7):
            data_direction[i,2] = '■'
            data_direction[i,4] = '■'
            data_direction[i,6] = '■'
        print(data_direction)

### Qlearning Main

In [16]:
tz = pytz.timezone('Asia/Seoul')
env = Simulator()
agent = QAgent()
print('list(env.files.iloc[0])[0]:', list(env.files.iloc[0])[0])

for n_epi in range(1): # range()의 인수로 len(env.files) 사용하면 됨 (=39,999)
    obs = env.reset(n_epi)
    path = env.local_target

    for iter in range(10000): # 하나의 에피소드에 대해서 최적 경로를 구하기 위한 반복

        done = False
        obs = env.reset(n_epi) # 이번 에피소드에서 방문해야 할 좌표 저장 @ env.local_target 리스트
        s = env.curloc

        while not done:  # 최종 목표에 도달하면 1회 끝남
            a = agent.select_action(s) # e-greedy로 액션 선택
            obs, r, cum_reward, done, goal_ob_reward = env.step(a)  # 중간 목표 도달시 다음 목표로 자동 바뀜 발생
            s_prime = env.curloc  # 새 위치 저장
            agent.update_table((s, a, r, s_prime))
            s = s_prime
            # while loop 종료

        if iter % 1000 == 0:
            #cur_time = datetime.now(tz)
            #simple_cur_time = cur_time.strftime("%H:%M:%S")
            print('▶ Episode #', n_epi, 'Iteration #', iter, end=' → ')
            #print('start time:', simple_cur_time, end=' → ')
            #print('cum_reward =', cum_reward, 'appended actions =', s, end='...')
            print('경로 길이:', len(env.actions))

        agent.anneal_eps()
        # for_iter loop 종료
    agent.show_table()
    # for_episode loop 종료


list(env.files.iloc[0])[0]: ['H', 'L', 'M']
▶ Episode # 0 Iteration # 0 → 경로 길이: 292
▶ Episode # 0 Iteration # 1000 → 경로 길이: 1295
▶ Episode # 0 Iteration # 2000 → 경로 길이: 1726
▶ Episode # 0 Iteration # 3000 → 경로 길이: 1161
▶ Episode # 0 Iteration # 4000 → 경로 길이: 1166
▶ Episode # 0 Iteration # 5000 → 경로 길이: 1926
▶ Episode # 0 Iteration # 6000 → 경로 길이: 2043
▶ Episode # 0 Iteration # 7000 → 경로 길이: 1247
▶ Episode # 0 Iteration # 8000 → 경로 길이: 13047
▶ Episode # 0 Iteration # 9000 → 경로 길이: 28117
[[3. 3. 3. 1. 2. 2. 3. 2. 2.]
 [3. 3. 3. 1. 3. 3. 0. 0. 0.]
 [3. 3. 0. 1. 3. 0. 0. 2. 0.]
 [0. 0. 0. 1. 0. 0. 0. 0. 0.]
 [0. 0. 0. 1. 0. 0. 0. 0. 0.]
 [3. 0. 0. 1. 0. 0. 0. 0. 0.]
 [1. 1. 0. 1. 0. 0. 0. 1. 1.]
 [3. 3. 3. 0. 1. 3. 1. 2. 2.]
 [3. 3. 3. 3. 1. 1. 1. 2. 2.]
 [0. 0. 0. 0. 1. 0. 0. 0. 0.]]
[['ⓔ' 'ⓕ' 'ⓖ' 'ⓗ' 'ⓘ' 'ⓙ' 'ⓚ' 'ⓛ' 'ⓜ']
 ['→' '→' '→' '↓' '→' '→' '↑' '↑' '↑']
 ['ⓓ' '→' '↑' '↓' '→' '↑' '↑' '←' 'ⓝ']
 ['ⓒ' '↑' '■' '↓' '■' '↑' '■' '↑' 'ⓞ']
 ['ⓑ' '↑' '■' '↓' '■' '↑' '■' '↑' 'ⓟ']
 ['ⓐ' '↑' '■

In [19]:
# 최적 경로
x = 9
y = 4
optimal_path = [[9,4]]
print('path:', path)
print(len(path))

while len(path) > 0:
    action_val = agent.q_table[x,y,:]
    action = np.argmax(action_val)
    print('action:', action)
    new_x, new_y = env.apply_action(action, x, y)
    x = new_x
    y = new_y
    print('[',x,y,']', end = ' ')
    optimal_path.append([x,y])
    if [x, y] == path[0]:
        if [x,y] == [9,4]:
            print('arrived [9,4]. finished!')
            break
        else:
            print('arrived ', path[0])
            path.remove(path[0])

print('Optimal path:', optimal_path)

path: [[0, 3], [0, 7], [0, 8], [9, 4]]
4
action: 1
[ 10 4 ] 

IndexError: ignored

     39         optimal_path = [[9,4]]
     40         while len(path) > 0:
---> 41             action_val = agent.q_table[x,y,:]
     42             action = np.argmax(action_val)
     43             new_x, new_y = env.apply_action(action, x, y)

IndexError: index 10 is out of bounds for axis 0 with size 10