In [1]:
import numpy as np
import math
import random
import pprint
import pygame
import sys

In [2]:
#===========CHANGABLE PARAMETERS=================
REWARD     = 1
PENALTY    = -1
DISCOUNT   = 0.9
LEARN_RATE_CONST = 200

def get_explore_rate(epoch):
    return max(0.1,1-math.log2(epoch*1000)/20)

#===========DEFINE CONSTANTS AND DICS=============
WALL_LEN = 1
PADDLE_H = 0.2

init_state = (0.5, 0.5, 0.03, 0.01, 0.5 - PADDLE_H / 2)

ACTION_DIC={0:-0.04, #'UP'
            1:0,     #'STAY'
            2:0.04}  #'DOWN'

BOARD_SIZE = 12
X_VBALL_DIS = [-1,1]
Y_VBALL_DIS = [-1,0,1]

PADDLE_SPACE = 12
PADDLE_X     = 1

STATE_SPACE = (BOARD_SIZE,BOARD_SIZE,len(X_VBALL_DIS),len(Y_VBALL_DIS),PADDLE_SPACE)

X_V_TSH = 0.03
Y_V_TSH = 0.015

In [3]:
#==============DEFINE STATE CLASS===============
class state:
    
    def __init__(self,ball_x,ball_y,velocity_x,velocity_y,paddle_y,reward,end_state = 0):
        self.ball_x = ball_x              #real numbers on the interval [0,1]
        self.ball_y = ball_y
        self.velocity_x = velocity_x
        self.velocity_y = velocity_y
        self.paddle_y = paddle_y
        self.reward = reward
        self.state_tuple = (ball_x,ball_y,velocity_x,velocity_y,paddle_y)
        self.end_state = end_state
        self._extract()
        
    def _extract(self):
        self.x_grid = min(math.floor(12*self.ball_x),BOARD_SIZE-1)
        self.y_grid = min(math.floor(12*self.ball_y),BOARD_SIZE-1)
        if(self.velocity_x>0): 
            self.x_v_sign = 0
        else: 
            self.x_v_sign = 1
            
        if(self.velocity_y>=0.015):
            self.y_v_sign = 0
        elif(self.velocity_y<=0.015): 
            self.y_v_sign = 1
        else:
            self.y_v_sign = 2
        self.paddle_grid = min(math.floor(12 * self.paddle_y / (1 - PADDLE_H)),PADDLE_SPACE-1)
        self.space_tuple = (self.x_grid,self.y_grid,self.x_v_sign,self.y_v_sign,self.paddle_grid)

In [144]:
#=======DEFINE MORE HELPER FUNCTIONS===========
def bounce(cur_state,action):
#     print('ENTER!!!!')
    n_ball_v_x = cur_state.velocity_x
    n_ball_v_y = cur_state.velocity_y
    
    n_ball_x = cur_state.ball_x + n_ball_v_x
    n_ball_y = cur_state.ball_y + n_ball_v_y

    fac = n_ball_v_y*(n_ball_x-1)/n_ball_v_x
    cmp_y = cur_state.ball_y + fac
    #======paddle should in range(0,0.8)
    n_paddle_y = min(WALL_LEN-PADDLE_H,max(0,cur_state.paddle_y + action))
    
    c_reward = 0
    
    #==the ball is off the top of the screen==
    if n_ball_y < 0:
#         print('1111111111111111111111111')
        n_ball_y = -n_ball_y
        n_ball_v_y = -n_ball_v_y
        
    #==the ball is off the bottom of the screen==
    if n_ball_y > 1:
#         print('22222222222222222222222222')
        n_ball_y = 2*WALL_LEN-n_ball_y
        n_ball_v_y = -n_ball_v_y
        
    #==the ball is off the left edge of the screen==
    if n_ball_x < 0:
#         print('33333333333333333333333333')
        n_ball_x = -n_ball_x
        n_ball_v_x = -n_ball_v_x
        
    #== the ball bouncing off the paddle==
#     print('n_paddle_y+PADDLE_H',n_paddle_y+PADDLE_H,'n_ball_y', n_ball_y,'n_paddle_y', n_paddle_y)
#     print(n_paddle_y+PADDLE_H <= n_ball_y <= n_paddle_y)
#     print(n_ball_x >= 1)
    if n_ball_x >= 1 and (n_paddle_y+PADDLE_H >= min(max(0,cmp_y),1) >= n_paddle_y):
#         print('yuayyyaysaysdyuasfhiuadsfhiua!')
        n_ball_x = 2*PADDLE_X - n_ball_x
        n_ball_v_x, n_ball_v_y = update_speed_rand(n_ball_v_x,n_ball_v_y)
        c_reward = REWARD
    elif n_ball_x > 1:
#         print('444444444444444444444444444')
        c_reward = PENALTY
#     if(c_reward == 1):
#         print(n_ball_x,n_ball_y,n_ball_v_x,n_ball_v_y,n_paddle_y,c_reward)
    return n_ball_x,n_ball_y,n_ball_v_x,n_ball_v_y,n_paddle_y,c_reward

def update_speed_rand(velocity_x,velocity_y):
    vx_delta = random.choice(range(-1,2,2)) * random.choice(range(0,16,1)) / 1000
    vy_delta = random.choice(range(-1,2,2)) * random.choice(range(0,4,1))  / 100
    sign_x = -velocity_x/abs(velocity_x)
    sign_y = velocity_y/abs(velocity_y)
    n_ball_v_x = sign_x*max(X_V_TSH,min(1,abs(vx_delta+velocity_x)))
    n_ball_v_y = sign_y*max(Y_V_TSH,min(1,abs(vy_delta+velocity_y)))
    return n_ball_v_x, n_ball_v_y

# def check_termination(cur_state):
#     pad_rang = range(cur_state.paddle_y-PADDLE_H,cur_state.paddle_y)
#     if cur_state.ball_x>=WALL_LEN and cur_state.ball_y not in pad_rang:
#         return True
#     else:
#         return False
        
def proceed_one_step(cur_state,action):

    n_ball_x,n_ball_y,n_ball_v_x,n_ball_v_y,n_paddle_y,c_reward = bounce(cur_state,action)
    end = 0
    if c_reward == -1:
        end = 1
#     print(n_ball_x,n_ball_y,n_ball_v_x,n_ball_v_y,n_paddle_y,c_reward,end,action)
    n_state = state(n_ball_x,n_ball_y,n_ball_v_x,n_ball_v_y,n_paddle_y,c_reward,end_state=end)
    return n_state

In [143]:
random.choice(range(0,4,1))  / 100

0.01

In [6]:
#=============DEFINE Q-AGENT CLASS==============
class q_agent:
    
    def __init__(self):
        self.q_table = np.zeros(STATE_SPACE+(len(ACTION_DIC),)) 
        self.table_vis = np.zeros(STATE_SPACE+(len(ACTION_DIC),))
        self.end_state = 0
        
    def set_table(self,loc,val):
        self.q_table[loc] = val
    
    def get_table(self,loc):
#         print(loc)
        return self.q_table[loc]

    def get_c(self,loc):
        return self.table_vis[loc]
    
    def set_c(self,loc,val):
        self.table_vis[loc] = val
        
    def get_act(self,cur_state,i,mode = 'train'):
        if mode=='train' and random.random()<get_explore_rate(i):
            return random.choice(range(0,3,1))
        return np.argmax(self.get_table(cur_state.space_tuple))
    

In [9]:
#=============DEFINE TRAIN FUCNTION=============
#init_state = (0.5, 0.5, 0.03, 0.01, 0.5 - PADDLE_H / 2)
#self.space_tuple = (self.x_grid,self.y_grid,self.x_v_sign,self.y_v_sign,self.paddle_grid)
def train(epoch_num,q_ag):
    pre_tot = 0
    tot_bounce = 0
    for i in range(1,epoch_num+1):
#         print('i=',i)
        temp_bounce = 0
        cur_state = state(0.5, 0.5, 0.03, 0.01, 0.5 - PADDLE_H / 2,0)
        while True:
            action = q_ag.get_act(cur_state,i)
#             action = 1
#             print('act=',action)
            n_state = proceed_one_step(cur_state,ACTION_DIC[action])
            old_val = q_ag.get_table(cur_state.space_tuple+(action,))


            prd_max = np.max(q_ag.get_table(n_state.space_tuple))
            prd_tuple = (n_state.x_grid,n_state.y_grid,n_state.x_v_sign,n_state.y_v_sign,n_state.paddle_grid,prd_max)
            
            Nsa = q_ag.get_c(cur_state.space_tuple+(action,))
            learn_rate = LEARN_RATE_CONST/(LEARN_RATE_CONST+Nsa)
            q_ag.set_c(cur_state.space_tuple+(action,),Nsa+1)
            
            new_val = (1-learn_rate)*old_val + learn_rate*(n_state.reward + DISCOUNT*prd_max)
            q_ag.set_table(cur_state.space_tuple+(action,),new_val)
#             print('22222222222=',prd_max)
#             print('33333333333333=',n_state.space_tuple+(action,))
#             print('44444444444=',q_ag.get_table(n_state.space_tuple+(action,)))
            if n_state.end_state == 1:
                break
            if n_state.reward == REWARD:
                temp_bounce+=1
#                 print('this round=',temp_bounce)
            cur_state = n_state
        tot_bounce+=temp_bounce
#         print()
#         print()
        if i%1000 == 0:
            print('loop',i)
            print(q_ag.q_table.sum())
            print('now average bounce is', (tot_bounce-pre_tot)/1000)
            print('bounce=',tot_bounce)
            print()
            pre_tot = tot_bounce

print(n_ball_x,n_ball_y,n_ball_v_x,n_ball_v_y,n_paddle_y,c_reward,end)

print(n_ball_x,n_ball_y,n_ball_v_x,n_ball_v_y,n_paddle_y,c_reward,end,action)

In [145]:
q_ag = q_agent()
train(100000,q_ag)

loop 1000
-615.085396289
now average bounce is 1.364
bounce= 1364

loop 2000
-1012.58457128
now average bounce is 2.017
bounce= 3381

loop 3000
-1464.50190036
now average bounce is 2.17
bounce= 5551

loop 4000
-1597.06329121
now average bounce is 2.423
bounce= 7974

loop 5000
-1774.57705
now average bounce is 2.744
bounce= 10718

loop 6000
-1759.19767116
now average bounce is 2.999
bounce= 13717

loop 7000
-1673.92010435
now average bounce is 3.28
bounce= 16997

loop 8000
-1536.33278803
now average bounce is 3.553
bounce= 20550

loop 9000
-1632.45713562
now average bounce is 3.523
bounce= 24073

loop 10000
-1502.25835316
now average bounce is 4.019
bounce= 28092

loop 11000
-1472.60156803
now average bounce is 4.495
bounce= 32587

loop 12000
-1410.73122521
now average bounce is 5.222
bounce= 37809

loop 13000
-1431.66258902
now average bounce is 5.587
bounce= 43396

loop 14000
-1474.53878298
now average bounce is 5.974
bounce= 49370

loop 15000
-1553.12681105
now average bounce is 5.85

In [148]:
train(100000,q_ag)

loop 1000
-2066.89524943
now average bounce is 8.274
bounce= 8274

loop 2000
-2067.77107062
now average bounce is 9.34
bounce= 17614

loop 3000
-2065.94420379
now average bounce is 9.7
bounce= 27314

loop 4000
-2044.58365193
now average bounce is 9.339
bounce= 36653

loop 5000
-2036.01305034
now average bounce is 9.321
bounce= 45974

loop 6000
-2037.74502312
now average bounce is 9.398
bounce= 55372

loop 7000
-2024.3413867
now average bounce is 9.826
bounce= 65198

loop 8000
-2051.66718551
now average bounce is 9.503
bounce= 74701

loop 9000
-2046.41946197
now average bounce is 9.703
bounce= 84404

loop 10000
-2030.3293276
now average bounce is 8.982
bounce= 93386

loop 11000
-2028.58601155
now average bounce is 9.298
bounce= 102684

loop 12000
-2020.27204179
now average bounce is 9.288
bounce= 111972

loop 13000
-2014.02083996
now average bounce is 8.949
bounce= 120921

loop 14000
-2003.21365221
now average bounce is 9.275
bounce= 130196

loop 15000
-2010.61919556
now average bounce 

In [12]:
def test(epoch_num,q_ag):
    tot_bounce = 0
    for i in range(epoch_num):
        temp_bounce = 0
        cur_state = state(0.5, 0.5, 0.03, 0.01, 0.5 - PADDLE_H / 2,0)
        while True:
            action = q_ag.get_act(cur_state,i,mode='hei')
            n_state = proceed_one_step(cur_state,ACTION_DIC[action])
            if n_state.end_state == 1:
                break
            if n_state.reward == REWARD:
                temp_bounce+=1
            cur_state = n_state
        tot_bounce+=temp_bounce
#         print('loop',i)
#         print('now average bounce is', tot_bounce/(i+1))
#         print('bounce in this game=',temp_bounce)
#         print()
    print('the avg bounce =',tot_bounce/epoch_num)

In [151]:
test(200,q_ag)

the avg bounce = 12.05


In [60]:
def draw(canvas, ball_x, ball_y, paddle_y):
    canvas.fill(BACKGROUND_COLOR)
    pygame.draw.line(canvas, WHITE, [WIDTH // 2, 0], [WIDTH // 2, HEIGHT], 1)
    pygame.draw.line(canvas, WHITE, [PAD_WIDTH, 0], [PAD_WIDTH, HEIGHT], 1)
    pygame.draw.line(canvas, WHITE, [WIDTH - PAD_WIDTH, 0], [WIDTH - PAD_WIDTH, HEIGHT], 1)

    paddle1_pos = (WIDTH + 1 - HALF_PAD_WIDTH, int(paddle_y * HEIGHT + HALF_PAD_HEIGHT))
    ball_pos = (int(ball_x * WIDTH) , int(ball_y * HEIGHT))

    pygame.draw.circle(canvas, BALL_COLOR, ball_pos, BALL_RADIUS, 0)
    pygame.draw.polygon(canvas, PAD_COLOR, [[paddle1_pos[0] - HALF_PAD_WIDTH, paddle1_pos[1] - HALF_PAD_HEIGHT],
                                        [paddle1_pos[0] - HALF_PAD_WIDTH, paddle1_pos[1] + HALF_PAD_HEIGHT],
                                        [paddle1_pos[0] + HALF_PAD_WIDTH, paddle1_pos[1] + HALF_PAD_HEIGHT],
                                        [paddle1_pos[0] + HALF_PAD_WIDTH, paddle1_pos[1] - HALF_PAD_HEIGHT]], 0)

In [61]:
WHITE = (255, 255, 255)
BALL_COLOR = (44, 62, 80)
PAD_COLOR = (41, 128, 185)
BACKGROUND_COLOR = (207, 216, 220)
SCORE_COLOR = (25, 118, 210)

WIDTH = 600
HEIGHT = 600

BALL_RADIUS = 8
PAD_WIDTH = 8
PAD_HEIGHT = HEIGHT * 0.2
HALF_PAD_WIDTH = PAD_WIDTH // 2
HALF_PAD_HEIGHT = PAD_HEIGHT // 2
GAME_FPS = 30
GAME_END_FPS = 30


def test_with_gui(agent, epoches, opponent=None):
    pygame.init()
    fps = pygame.time.Clock()

    window = pygame.display.set_mode((WIDTH, HEIGHT))
    pygame.display.set_caption('Pong')

    turns_count = []
    lscore = 0
    rscore = 0

    for i in range(epoches):

        temp_bounce = 0
        cur_state = state(0.5, 0.5, 0.03, 0.01, 0.5 - PADDLE_H / 2,0)
        
        while True: 
            
            action = q_ag.get_act(cur_state,i,mode='hei')
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    pygame.quit()
                    sys.exit()
            
            
            n_state = proceed_one_step(cur_state,ACTION_DIC[action])
            
            
            if n_state.end_state == 1:
                break
            
            draw(window, n_state.ball_x, n_state.ball_y, n_state.paddle_y)
            
            pygame.display.update()
            fps.tick(GAME_FPS)
            cur_state = n_state
        fps.tick(GAME_END_FPS)

    return np.mean(turns_count)

In [146]:
test_with_gui(q_ag,200)

KeyboardInterrupt: 

In [None]:
np.zeros(STATE_SPACE+(len(ACTION_DIC),)+(2,)) 

In [155]:
from tempfile import TemporaryFile
outfile = TemporaryFile()
np.save('tablevis', q_ag.table_vis)