In [1]:
import numpy as np
import pygame as pg
from itertools import count, product
from tqdm import tqdm
import matplotlib.pyplot as plt
import os


pygame 2.0.1 (SDL 2.0.14, Python 3.8.10)
Hello from the pygame community. https://www.pygame.org/contribute.html


### Define Constants

In [2]:
"""pong table dimensions"""
WIDTH = HEIGHT = 1

"""pong peddles dimensions"""
P_W = 0.2
P_H = 0.02

"""pong peddles y positions"""
Y0 = 0.9
Y1 = 0.1

"""ball attributes"""
BALL_R = 0.02
BALL_VY = 1
BALL_VX = 0

"""state vector indices"""
X0 = 0  # x position of peddle 0
X1 = 1  # x position of peddle 1
X_B = 2  # x position of ball
Y_B = 3  # y position of ball
VX_B = 4  # vx of ball
VY_B = 5  # vy of ball

MAX_V = 5
dt = 0.01


### Pong Transition Function

In [3]:
def pong_transition(s, a):
    """
    given state and action vectors, return next state vector and reward
    state vector is <x_{p0}, x_{p1}, x_{ball}, y_{ball}, v_x_{ball}, v_y_{ball}>
    action_vector is <v_x_{p0}, v_x_{p1}}>
    next state vector is <x_{p0} + v_x_{p0}dt, x_{p1} + v_x_{p1}dt, x_{ball} + v_x_{ball}dt, y_{ball} + v_y_{ball}dt, v_x_{ball}_{new}, v_y_{ball}_{new}>
    """

    # get the peddles next positions
    # if action takes peddle off the screen, effective action (peddle velocity) is 0
    s_p = np.copy(s)
    p_trans = s[: X_B] + a * dt * MAX_V
    a[(p_trans < P_W / 2) | (p_trans > WIDTH - P_W / 2)] = 0
    s_p[: X_B] += a * dt * MAX_V
    
    r = 0
    win = 0
    terminal = False
    # if ball touches either peddle, reverse ball y velocity, and add peddle x velocity to ball x velocity
    dy = s[VY_B] * dt
    if s[Y_B] + dy <= Y1:
        # if ball is as high as the top peddle
        if abs(s[X_B] - s[X1]) <= P_W - 2 * BALL_R:
            # if ball is on top peddle, 
            # flip y velocity, and add peddle x velocity to ball x velocity
            s_p[VY_B] *= -1
            s_p[VX_B] += a[1] * MAX_V
        else:
            # r = 5
            terminal = True
            win = 1
            
    elif s[Y_B] + dy >= Y0:
        # if ball is as high as the top peddle
        if abs(s[X_B] - s[X0]) <= P_W - 2 * BALL_R:
            # if ball is on top peddle, 
            # flip y velocity, and add peddle x velocity to ball x velocity
            s_p[VY_B] *= -1
            s_p[VX_B] += a[0] * MAX_V
            r = 1
        else:
            terminal = True
            r = -1

    # if ball touches sides, reverse ball x velocity
    dx = s[VX_B] * dt
    if s[X_B] + dx <= BALL_R or s[X_B] + dx >= 1 - BALL_R:
        s_p[VX_B] *= -1
        
    # transition ball according to its velocity
    s_p[X_B: VX_B] += s_p[VX_B: ] * dt


    return s_p, r, terminal, win
    
    
def pong_transition_solo(s, a):
    """
    given state and action vectors, return next state vector and reward
    state vector is <x_{p0}, x_{p1}, x_{ball}, y_{ball}, v_x_{ball}, v_y_{ball}>
    action_vector is <v_x_{p0}, v_x_{p1}}>
    next state vector is <x_{p0} + v_x_{p0}dt, x_{p1} + v_x_{p1}dt, x_{ball} + v_x_{ball}dt, y_{ball} + v_y_{ball}dt, v_x_{ball}_{new}, v_y_{ball}_{new}>
    """

    # get the peddles next positions
    # if action takes peddle off the screen, effective action (peddle velocity) is 0
    s_p = np.copy(s)
    p_trans = s[X0] + a[0] * dt * MAX_V
    if not P_W / 2 < p_trans < WIDTH - P_W:
        a[X0] = 0
    s_p[X0] += a[0] * dt * MAX_V
    
    r = 0
    terminal = False
    # if ball touches either peddle, reverse ball y velocity, and add peddle x velocity to ball x velocity
    dy = s[VY_B] * dt
    if s[Y_B] + dy <= BALL_R:
        # if ball is as high as the top 
        # flip y velocity, and add peddle x velocity to ball x velocity
        s_p[VY_B] *= -1
            
    elif s[Y_B] + dy >= Y0:
        # if ball is as high as the top peddle
        if abs(s[X_B] - s[X0]) <= P_W - 2 * BALL_R:
            # if ball is on top peddle, 
            # flip y velocity, and add peddle x velocity to ball x velocity
            s_p[VY_B] *= -1
            s_p[VX_B] += a[0] * MAX_V + np.random.uniform(-0.5, 0.5)
            r = 1
        else:
            terminal = True
            r = -1

    # if ball touches sides, reverse ball x velocity
    dx = s[VX_B] * dt
    if s[X_B] + dx <= BALL_R or s[X_B] + dx >= 1 - BALL_R:
        s_p[VX_B] *= -1
        
    # transition ball according to its velocity
    s_p[X_B: VX_B] += s_p[VX_B: ] * dt

    return s_p, r, terminal
    

### Pong Gui

In [4]:
"""pong gui constants"""
SCALE = 900
PG_W, PG_H = WIDTH * SCALE, HEIGHT * SCALE
PG_W_2, PG_H_2 = PG_W // 2, PG_H // 2
PED_W, PED_H = int(P_W * SCALE), int(P_H * SCALE)
PED_W2, PED_H2 = int(PED_W / 2), int(PED_H / 2)


FPS = 30
BG_COLOR = pg.Color(50, 50, 50)
BORDER_COLOR = pg.Color(220, 220, 220)
BALL_COLOR = pg.Color(200, 70, 70)
PEDDLE_COLOR = pg.Color(240, 240, 240)

class PongGui:

    def __init__(self):
        self.screen, self.bg = self.init()
        
    def init(self):
        pg.init()  # initialize pygame
        screen = pg.display.set_mode((PG_W, PG_H))  # set up the screen
        pg.display.set_caption("Mohamed Martini")  # add a caption
        bg = pg.Surface(screen.get_size())  # get a background surface
        bg = bg.convert()
        bg.fill(BG_COLOR)
        screen.blit(bg, (0, 0))
        return screen, bg

    def render(self):
        """show the grid array on the screen"""
        pg.display.flip()
        pg.display.update()
    
    def draw_table(self):
        pg.draw.line(self.screen, BORDER_COLOR, (0, PG_H_2), 
                     (PG_W, PG_H_2), width=2)
        
    
    def draw_state(self, s):
        for center_x, center_y in zip(s[X0: X0 + 2], [Y0, Y1]):
            center_x = int(center_x * SCALE)
            center_y = int(center_y * SCALE)
            pg.draw.rect(self.screen, PEDDLE_COLOR, 
                         (center_x - PED_W2, 
                          center_y - PED_H2,
                          PED_W,
                          PED_H)
                         )
        
        circle_center = s[X_B: VX_B] * SCALE
        pg.draw.circle(self.screen, BALL_COLOR, 
                       circle_center.astype(int),
                       int(BALL_R * SCALE), 
                       width=int(BALL_R * SCALE))
    
    def reset_screen(self):
        self.screen.fill(BG_COLOR)
        self.draw_table()
    
    def play(self, theta0=None, theta1=None):
        """receive a list of positions on the x axis, and plot the movement of the screen"""
        if theta0 is None:
            theta0 = np.zeros(NUM_FEATURES * NUM_ACTIONS)
        if theta1 is None:
            theta1 = np.zeros(NUM_FEATURES * NUM_ACTIONS)

        s_sim = get_s0_sim()
        s = transform(s_sim, direction=1)
        score = 0
        
        clock = pg.time.Clock()
        run = True
        while run:
            clock.tick(FPS)
            for event in pg.event.get():
                if event.type == pg.QUIT:
                    run = False
            self.reset_screen()
            self.draw_table()
            self.draw_state(s)

            # take action, observe reward and next state
            a0, policy0 = get_action(s[KI], theta0)
            a1 = np.random.choice(NUM_ACTIONS)
            a_sim = np.array([A[a0], A[a1]])

#             sp_sim, r, terminal, win = pong_transition(s_sim, a_sim)
            sp_sim, r, terminal = pong_transition_solo(s_sim, a_sim)
            sp = transform(sp_sim, direction=1)

            s = sp
            s_sim = sp_sim
            score += r
            
            # print score
            myFont = pg.font.SysFont("Times New Roman", 32)
            score_disp = myFont.render(str(int(score)), 1, BALL_COLOR)
            self.screen.blit(score_disp, (10, 10))

            self.render()
            
            if terminal:
                s_sim = get_s0_sim()
                s = transform(s_sim, direction=1)

        pg.quit()


### Actor Critic

#### Helper Functions:

In [5]:
def x_s(s: np.array):
    """return x(s) as fourier basis of state"""
    x = np.zeros(NUM_FEATURES)
    for i, c in enumerate(product(range(D + 1), repeat=K)):
        c = np.array(c)
        x[i] = np.cos(np.pi * s.T @ c)
    return x


def x_sa(s: np.array, a: int):
    """return x(s, a) as fourier basis of state, shifted according to the action index"""
    x = np.zeros(NUM_FEATURES * NUM_ACTIONS)
    start = NUM_FEATURES * a
    end = start + NUM_FEATURES
    x[start: end] = x_s(s)
    return x


def h_s(s: np.array, theta: np.array):
    """return actions' preferences in state s"""
    h = np.zeros(NUM_ACTIONS)
    for a in range(NUM_ACTIONS):
        h[a] = theta @ x_sa(s, a)
    return h


def pi_s(s: np.array, theta: np.array):
    """return policy at state s"""
    h = h_s(s, theta)
    exp = np.exp(h - np.max(h))
    return exp / np.sum(exp)


def v_s(s: np.array, w: np.array):
    """return the value of a state given the weights vector"""
    return w @ x_s(s)


def get_action(s, theta):
    """return index of action at state s according to weights theta"""
    policy = pi_s(s, theta)
    return np.random.choice(range(NUM_ACTIONS), p=policy), policy


def get_pi_gradient(s, a, policy):
    """compute gradient ln pi(a|s, theta), which equals x(s,a) = \sum_b \pi(b|s, theta) x(s,b)"""
    x = x_sa(s, a)
    summation = 0
    for i in range(NUM_ACTIONS):
        summation += policy[i] * x_sa(s, i)
    return x - summation


def get_s0_sim():
    s = np.random.uniform(0.11, 0.9, size=K_sim)
    s[Y_B] = 0.5
    direction = np.random.choice((-1, 1))
    s[VX_B] = 0
    s[VY_B] = direction * MAX_V
    return s

# def get_s0_sim():
#     s = np.zeros(K_sim)
#     s[X0: VX_B] = 0.5
#     direction = np.random.choice((-1, 1))
#     s[VX_B] = direction * 0.35 * MAX_V #np.random.uniform(0.45, 0.55)
#     s[VY_B] = direction * 0.65 * MAX_V
#     return s

def transform(s, direction=1):
    # direction = 0 -> s to s_sim. 
    # direction = 1 -> s_sim to s. 
    dirs = ((0, 1), (-MAX_V, MAX_V))
    _s = np.copy(s)
    _s[VX_B: ] = np.interp(s[VX_B: ], dirs[direction], dirs[1 - direction])
    return _s


#### Algorithm

In [6]:
def actor_critic_et(theta0, theta1, num_episodes):
    gamma = 0.99
    
    W = np.zeros(NUM_FEATURES)  # weights for estimating v_s
    
    lambda_w = 0.5
    lambda_theta = 0.5
    
    alpha_w = 1e-4
    alpha_theta = 1e-5
    
    steps_per_e = np.zeros(num_episodes)
    rewards = 0
    
    for episode in range(num_episodes):
        # initialize s - simulation state is not normalized
        s_sim = get_s0_sim()
        s = transform(s_sim, direction=1)
        s = np.array([s[i] for i in range(s.shape[0]) if i != 1])

        # reset z vectors
        z_theta = np.zeros_like(theta0)
        z_w = np.zeros_like(W)

        # reset gamma multiplier
        I = 1
        
        # loop through episode
        for t in count():
            # select action
            a0, policy0 = get_action(s, theta0)
            a1 = np.random.choice(NUM_ACTIONS)
            # if t == 0 and not episode % 100:
            #     d = "right" if s[VX_B] > 0 else "left"
            #     print(f"{d.upper()} Initial Policy", policy0)
            
            # take action, observe reward and next state
            a_sim = np.array([A[a0], A[a1]])
#             s_p_sim, r, terminal, win = pong_transition(s_sim, a_sim)
            s_p_sim, r, terminal = pong_transition_solo(s_sim, a_sim)
            rewards += r
            s_p = transform(s_p_sim, direction=1)
            s_p = np.array([s_p[i] for i in range(s_p.shape[0]) if i != 1])
            
            # calculate the error (delta) - account for terminal state
            if terminal:
                v_sp = 0
            else:
                v_sp = v_s(s_p, W)
            
            delta = r + gamma * v_sp  - v_s(s, W)
            # if t == 0:
            #     print(round(delta, 2))
            
            # update z_w
            z_w = gamma * lambda_w * z_w + x_s(s)
            
            # update z_theta
            gradient = get_pi_gradient(s, a0, policy0)
            z_theta = gamma * lambda_theta * z_theta + I * gradient
            
            # update w
            W += alpha_w * delta * z_w
            
            # update theta
            theta0 += alpha_theta * delta * z_theta
            
            if terminal or t > 1_000:
                break
            
            I *= gamma
            s = s_p
            s_sim = s_p_sim
        
        steps_per_e[episode] = t
            
    return theta0, theta1, steps_per_e, rewards

def play_offline(theta0, theta1, num_episodes):

    steps_per_e = np.zeros(num_episodes)
    rewards = 0
    
    for episode in range(num_episodes):
        # initialize s - simulation state is not normalized
        s_sim = get_s0_sim()
        s = transform(s_sim, direction=1)
        
        # loop through episode
        for t in count():
            # select action
            a0, policy0 = get_action(s[KI], theta0)
            a1 = np.random.choice(NUM_ACTIONS)
            
            # take action, observe reward and next state
            a_sim = np.array([A[a0], A[a1]])
#             s_p_sim, r, terminal, win = pong_transition(s_sim, a_sim)
            s_p_sim, r, terminal = pong_transition_solo(s_sim, a_sim)
            s_p = transform(s_p_sim, direction=1)

            rewards += r          
            if terminal or t > 1_000:
                break
            
            s = s_p
            s_sim = s_p_sim
        
        steps_per_e[episode] = t
            
    return steps_per_e, rewards

In [7]:
A = np.array([-1, 0, 1])
NUM_ACTIONS = 3
K = 5
K_sim = 6
KI = [i for i in range(K_sim) if i != 1]
D = 3
NUM_FEATURES = (D + 1) ** K

batch = 100
batches = 100
num_episodes = batch * batches

base_path = "colab/solo"
filename = os.path.join(base_path, "BEST_WEIGHTS_815")
# base_name, idx = filename.split("_")
try:
    theta0 = np.load(f"{filename}.npy")
    print("Importing Weights ...")
except:
    theta0 = np.zeros(NUM_ACTIONS * NUM_FEATURES)  # theta for each action
theta1 = np.zeros_like(theta0)

# test_batch = 100
# for i in tqdm(range(1, batches + 1)):
#     theta0, theta1, steps_per_e = actor_critic_et(theta0, theta1, batch)
#     steps_per_e, win_perc = play_offline(theta0, theta1, test_batch)
#     win_perc = win_perc / test_batch * 100
#     print(f"win perc. {win_perc}%")
#     print(f"steps_per_e mean: {steps_per_e.mean()}")
#     outfile = f"{base_name}_{i + int(idx)}"
#     outfile = os.path.join(base_path, outfile)
#     np.save(outfile, theta0)
#     print(f"new weights saved as {outfile}.npy")

#     if win_perc > best_weights:
#         best_weights = win_perc
#         np.save(os.path.join(base_path, "BEST_WEIGHTS"), theta0)
#         print(f"Best weights {best_weights} saved!")




Importing Weights ...


In [8]:
PongGui().play(theta0, theta1)
# PongGui().play()

In [9]:
# base_path = "colab"
# num_episodes = 100

# y = []
# x = []
# for filename in tqdm(os.listdir(base_path)):
#     if not filename.endswith(".npy"):
#         continue
#     basename, fmt = filename.split(".")
#     index = int(basename.split("_")[-1])
#     if index > 100:
#         continue

#     x.append(index*chunk)
#     filename = os.path.join(base_path, filename)
#     theta0 = np.load(filename)
#     score = PongGui.play_offline(theta0=theta0, num_episodes=num_episodes)
#     y.append(score)


In [10]:
# plt.scatter(x, y)
# plt.axhline(num_episodes)
# plt.axhline(-num_episodes, color="red")