In [1]:
!pip install pygame



In [2]:
from google.colab import drive
import sys

# google colab은 화면이 없으므로 가상으로 하나 만든다.
import os
os.environ['SDL_VIDEODRIVER']='dummy'
import pygame
pygame.display.set_mode((640,480))

# Q_table을 불러오기 위한 묘듈
import pickle

drive.mount('/content/gdrive')

pygame 1.9.6
Hello from the pygame community. https://www.pygame.org/contribute.html
Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [0]:
# 진행 상황 로그를 주기적으로 지워주기 위해
from IPython.display import clear_output

In [4]:
!ls gdrive/My\ Drive/Colab\ Notebooks/py

sys.path.insert(0, '/content/gdrive/My Drive/Colab Notebooks/py/')

dot_Q.py  __pycache__


In [0]:
# 필요한 모듈 불러오기
import sys
import numpy as np
import random

#import pygame
import matplotlib.pyplot as plt
import datetime
import time

# 게임 불러오기
#import pong
import dot_Q as game

In [6]:
# 클래스 만들기
class Q_Learning:
    def __init__(self):
        # 게임 정보
        self.algorithm = 'Q_Learning'
        self.game_name = game.ReturnName()

        # 파라미터를 가져온다
        self.progress = ''
        self.num_action = game.Return_Num_Action()

        # 초기 파라미터
        self.num_training = 100000  # 트레이닝 step의 수 (Epsilon Greedy)
        self.num_testing = 1000  # 테스팅 step 수 (Epsilon = 0)

        self.learning_rate = 0.01  # 학습률
        self.gamma = 0.3  # Discount Factor 미래의 보상을 얼마나 고려할 것인지?

        self.first_epsilon = 0.1  # 처음의 epsilon의 값 // Q_table을 불러오므로 필요 없음
        self.final_epsilon = 0.1  # 마지막 epsilon의 값

        self.epsilon = self.first_epsilon

        # 200 판마다 평균 점수를 plotting 할 지 결정
        self.num_plot_episode = 200

        self.step = 1  # 1번 action을 선택하고 행동
        self.score = 0  # 게임의 점수 (한 에피소드 안에서의 보상의 총 합)
        self.episode = 1  # 게임을 몇 판을 진행했는지 나타내는 수

        # plotting을 위한 데이터 저장
        self.plot_x = []
        self.plot_y = []

        self.date_time = str(datetime.date.today()) + '-' + str(datetime.datetime.now().hour) + '-' + str(
            datetime.datetime.now().minute)

        # Q-table 초기화
        self.Q_table = {} # Dictionary key: state, value: Q-value

        # 필요한 함수 정의

    def get_progress(self):
        progress = ''

        if self.step <= self.num_testing:
            progress = 'Testing'
        else:
            progress = "Finished"

        return progress

    def select_action(self, state):
        action = np.zeros([self.num_action])
        action_index = 0

        # 액션 선택
        # greedy 액션 취함
        action_index = np.argmax(self.Q_table[state])
        action[action_index] = 1

        # 탐색 종료
        self.epsilon = 0

        return action

    def train(self, state, action, reward, next_state, terminal):

        # state나 next_state가 Q_table에 없는 경우
        # 해당 state나 next state에 각각 action 에 대한 Q_value 추가
        # (0으로 초기화)

        if state not in self.Q_table.keys():
            self.Q_table[state] = []
            for i in range(self.num_action):
                self.Q_table[state].append(0)

        if next_state not in self.Q_table.keys():
            self.Q_table[next_state] = []
            for i in range(self.num_action):
                self.Q_table[next_state].append(0)

        # Q 값을 업데이트
        action_index = np.argmax(action)

        if state in self.Q_table.keys() and next_state in self.Q_table.keys():
            if terminal is True:
                # Q-Learning
                # terminal이 True 이면 // 즉 게임이 멈추면
                # Q-value = (1 - a) * Q-value + a * (reward)
                self.Q_table[state][action_index] = (1 - self.learning_rate) * self.Q_table[state][action_index] + self.learning_rate * (reward)
            else:
                # terminal이 False 이면 // 즉 게임이 진행되는 경우
                # Q-value = (1 - a) * Q-value + a * (reward + r*MaxQ)
                self.Q_table[state][action_index] = (1 - self.learning_rate) * self.learning_rate * self.Q_table[state][action_index] + (reward + self.gamma * max(self.Q_table[next_state]))

    def plotting(self):

        if len(self.plot_x) % self.num_plot_episode == 0 and len(self.plot_x) != 0:
            plt.xlabel('Episode')
            plt.ylabel('Score')
            plt.title(self.algorithm)
            plt.grid(True)

            # x축 데이터 = episode, y축 데이터 = 1 episode 동안의 reward의 합
            plt.plot(np.average(self.plot_x), np.average(self.plot_y), marker='*', ms=5)
            plt.draw()
            plt.pause(0.000001)

            self.plot_x = []
            self.plot_y = []

    def if_terminal(self, game_state):

        # 진행상황 표기
        print('Step: ' + str(self.step) + ' / ' + \
              'Episode: ' + str(self.episode) + ' / ' + \
              'Progress: ' + self.progress + ' / ' + \
              'Epsilon: ' + str(self.epsilon) + ' / ' + \
              'Score: ' + str(self.score))

        # 한 episode 가 끝날 때 episode와 score를 저장 (plot)
        # Episode 1 증가, 한 episode 동안의 Score 초기화
        self.plot_x.append(self.episode)
        self.plot_y.append(self.score)
        self.episode += 1
        self.score = 0

        # state 초기화
        state, _, _ = game_state.frame_step(np.zeros([self.num_action]))

        return state

    def main(self):

        # 게임 환경 초기화
        game_state = game.GameState()
        print(game_state)

        # 초기화
        action = np.zeros([self.num_action])
        state, _, _ = game_state.frame_step(action)

        #qtable 불러오기
        with open('gdrive/My Drive/Colab Notebooks/dot_Q_table.pickle', 'rb') as file:
          self.Q_table = pickle.load(file)


        print("state", state)

        while True:
            self.progress = self.get_progress()

            # 액션을 취한다. 0, 1, 2, 3
            action = self.select_action(state)

            # 액션을 취한 값을 환경에 대입
            next_state, reward, terminal = game_state.frame_step(action)

            # 트레이닝이 필요 없으므로
            #self.train(state, action, reward, next_state, terminal)

            # plotting, 쓰고 싶지 않다면 주석처리
            #self.plotting()

            # 프로그램이 끝나면
            if self.progress == 'Finished':
                print('Finished')
                break

            # info를 업데이트
            state = next_state
            self.score += reward
            self.step += 1

            if self.step % 10000 is 0:
              clear_output()

            if terminal:
                state = self.if_terminal(game_state)

        # 끝나면
        print("테스팅 끝!")


if __name__ == '__main__':
    # 클래스 생성
    agent = Q_Learning()

    # 실행
    agent.main()

<dot_Q.GameState object at 0x7febb99ba8d0>
state ((0, 3), (3, 0), (2, 0))
Step: 6 / Episode: 1 / Progress: Testing / Epsilon: 0 / Score: 0.96
Step: 10 / Episode: 2 / Progress: Testing / Epsilon: 0 / Score: 0.97
Step: 15 / Episode: 3 / Progress: Testing / Epsilon: 0 / Score: 0.96
Step: 17 / Episode: 4 / Progress: Testing / Epsilon: 0 / Score: 0.99
Step: 18 / Episode: 5 / Progress: Testing / Epsilon: 0 / Score: 1
Step: 20 / Episode: 6 / Progress: Testing / Epsilon: 0 / Score: 0.99
Step: 24 / Episode: 7 / Progress: Testing / Epsilon: 0 / Score: 0.97
Step: 25 / Episode: 8 / Progress: Testing / Epsilon: 0 / Score: 1
Step: 27 / Episode: 9 / Progress: Testing / Epsilon: 0 / Score: 0.99
Step: 31 / Episode: 10 / Progress: Testing / Epsilon: 0 / Score: 0.97
Step: 35 / Episode: 11 / Progress: Testing / Epsilon: 0 / Score: 0.97
Step: 36 / Episode: 12 / Progress: Testing / Epsilon: 0 / Score: 1
Step: 40 / Episode: 13 / Progress: Testing / Epsilon: 0 / Score: 0.97
Step: 42 / Episode: 14 / Progress: 