In [7]:
!pip install pygame
!pip install tensorflow==2.3.0
!pip install gym
!pip install keras
!pip install keras-rl2

Collecting tensorflow==2.3.0
  Downloading tensorflow-2.3.0-cp38-cp38-win_amd64.whl (342.5 MB)
Collecting google-pasta>=0.1.8
  Downloading google_pasta-0.2.0-py3-none-any.whl (57 kB)
Collecting gast==0.3.3
  Downloading gast-0.3.3-py2.py3-none-any.whl (9.7 kB)
Collecting opt-einsum>=2.3.2
  Downloading opt_einsum-3.3.0-py3-none-any.whl (65 kB)
Collecting scipy==1.4.1
  Downloading scipy-1.4.1-cp38-cp38-win_amd64.whl (31.0 MB)
Collecting termcolor>=1.1.0
  Downloading termcolor-1.1.0.tar.gz (3.9 kB)
Collecting keras-preprocessing<1.2,>=1.1.1
  Downloading Keras_Preprocessing-1.1.2-py2.py3-none-any.whl (42 kB)
Collecting tensorflow-estimator<2.4.0,>=2.3.0
  Downloading tensorflow_estimator-2.3.0-py2.py3-none-any.whl (459 kB)
Collecting astunparse==1.6.3
  Downloading astunparse-1.6.3-py2.py3-none-any.whl (12 kB)
Building wheels for collected packages: termcolor
  Building wheel for termcolor (setup.py): started
  Building wheel for termcolor (setup.py): finished with status 'done'
  Cre

# Environment Class Declaration

In [17]:
from cmath import sin
from gym import Env
from gym import spaces
from gym.spaces import Discrete, Box
import numpy as np
import random
import math
import pygame
from pygame import gfxdraw
from pandas import array
from typing import Optional
from os import path

class BallBalancerEnv(Env):
    ### Observation Space
    #The observation is a `ndarray` with shape `(4,)` where the elements correspond to the following:
    #| Num | Observation           | Min                  | Max                |
    #|-----|-----------------------|----------------------|--------------------|
    #| 0   | Ball Position         | -max_state           | max_state          |
    #| 1   | Ball Velocity         | -Inf                 | Inf                |
    
    ### Action Space
    #The action space is Discrete 3 (clockwise(0), no turn (1), counterclockwise (2))
    
    def __init__(self):
        #Defining our action space, can have three possible actions: clockwise turn, counterclockwise turn, and do not move
        self.action_space = Discrete(3)
        #max position of the ball before it falls off stick
        self.max_state = 20.0
        self.max_reward_position = self.max_state / 3
        
        high = np.array(
            [
                float(self.max_state), #Ball Position
                np.finfo(np.float32).max #Max Ball Velocity
            ]
        )
        
        #Ball possible positions (low, high)
        self.observation_space = spaces.Box(-high, high)
        
        #Ball starting position
        self.position = random.randint(-self.max_state,self.max_state)/2.0
        #how many times the machine switches actions
        self.balancing_actions = 50
        #the angle of the stick -90, 90 for completely vertical
        self.stick_angle = 0
        #acceleration adds to velocity
        self.ball_velocity = 0.0
        #for rendering environment
        self.screen = None
        #screen dimensions (1000 by 1000 square)
        self.screen_dim = 1000
        
    def step(self, action):
        self.balancing_actions-= 1
        
        #lets say for each action, the stick rotates 2 degrees
        self.stick_angle += (action-1)*2;
        
        #acceleration adds to velocity
        acc = -math.sin(math.radians(self.stick_angle)) * 9.81
        self.ball_velocity += acc
        
        # assuming each step is 1 sec, then self.position increments by velocity
        self.position += self.ball_velocity
        
        #reward system
        done = False
        if self.position >=-self.max_reward_position and self.position <=self.max_reward_position: 
            reward = 1 
        elif self.position >= self.max_state or self.position <= -self.max_state:
            reward = -1
            self.position = 0
            done = True
        else: 
            reward = -1 
        
        if self.balancing_actions <= 0: 
            done = True
        
        
        # Set placeholder for info
        info = {}
        
        # Return step information
        return np.array((self.position, self.ball_velocity), dtype=np.float32), reward, done, info
    
    def render(self, mode="human"):
        #defining variables
        stateToWidthRatio = 20
        stickWidth = self.max_state * stateToWidthRatio * 2
        stickHeight = stickWidth / 5
        stickX = self.screen_dim / 2
        stickY = 2 * self.screen_dim / 3;
        ballRadius = 100.0
        disStickToBall = stickHeight/2+ballRadius
        newX = stickX + disStickToBall * math.sin(math.radians(self.stick_angle))
        newY = stickY - disStickToBall * math.cos(math.radians(self.stick_angle))
        ballX = newX - self.position * stateToWidthRatio * math.cos(math.radians(self.stick_angle))
        ballY = newY - self.position * stateToWidthRatio * math.sin(math.radians(self.stick_angle))
        
        l, r, t, b = (
            -stickWidth / 2,
            stickWidth / 2,
            stickHeight / 2,
            -stickHeight / 2,
        )
        
        #rotate stick by stick_angle
        stickCoords = []
        for coord in [(l, b), (l, t), (r, t), (r, b)]:
            coord = pygame.math.Vector2(coord).rotate_rad(math.radians(self.stick_angle))
            stickCoords.append(coord)
        
        #add stickX and stickY to stickCoords
        stickCoords = [(c[0] + stickX, c[1] + stickY) for c in stickCoords]

        #rendering image of environment, starting screen
        if self.screen is None:
            pygame.init()
            self.screen = pygame.display.set_mode((self.screen_dim, self.screen_dim))
        
        # Fill background
        background = pygame.display.get_surface()
        background = background.convert()
        background.fill((200, 200, 200))
        
        # Display some text
        font = pygame.font.Font(None, 36)
        string = "Position: " + str(self.position)
        text = font.render(string, 1, (10, 10, 10))
        textpos = text.get_rect()
        textpos.centerx = background.get_rect().centerx
        background.blit(text, textpos)
        
        # Draw Stick
        gfxdraw.aapolygon(background, stickCoords, (202, 152, 101))
        gfxdraw.filled_polygon(background, stickCoords, (202, 152, 101))
        
        # Draw Ball
        gfxdraw.aacircle(
            background,
            int(ballX),
            int(ballY),
            int(ballRadius),
            (129, 132, 203),
        )
        gfxdraw.filled_circle(
            background,
            int(ballX),
            int(ballY),
            int(ballRadius),
            (129, 132, 203),
        )
        
        #This will pump the event queue and close the window and program
        #if the user clicks the close button of the window
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                abc = 0
                
                
        # Blit everything to the screen
        self.screen.blit(background, (0, 0))
        pygame.display.flip()
        return True
                         
    def reset(self):
        #reseting enviroment
        #ball placed at new random position
        self.position = random.randint(-self.max_state,self.max_state)/2
        #reseting balancing time
        self.balancing_actions = 50 
        #reseting velocity
        self.ball_velocity = 0.0
        #the angle of the stick -90, 90 for completely vertical
        self.stick_angle = 0
        return np.array((self.position, self.ball_velocity), dtype=np.float32)
    
    def close(self):
        pygame.display.quit() 
        pygame.quit()

# Random Sampling Of Environment

In [18]:
#import BallBalancerEnv as BBE
import math
import random
import time
env = BallBalancerEnv()

episodes = 1
for episode in range(1, episodes+1):
    state = env.reset()
    done = False
    score = 0 
    
    while not done:
        env.render()
        action = env.action_space.sample()
        n_state, reward, done, info = env.step(action)
        score+=reward
        
        #help make it visual
        time.sleep(0.05)
    print('Episode:{} Score:{}'.format(episode, score))
    
env.close()

Episode:1 Score:-6


# Training Environment

In [11]:
!pip install stable-baselines3[extra]
import gym 
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.evaluation import evaluate_policy



In [19]:
env = DummyVecEnv([lambda: env])
model = PPO('MlpPolicy', env, verbose = 1)
model.learn(total_timesteps=50000)

Using cpu device
-----------------------------
| time/              |      |
|    fps             | 1054 |
|    iterations      | 1    |
|    time_elapsed    | 1    |
|    total_timesteps | 2048 |
-----------------------------
-----------------------------------------
| time/                   |             |
|    fps                  | 879         |
|    iterations           | 2           |
|    time_elapsed         | 4           |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.009376313 |
|    clip_fraction        | 0.111       |
|    clip_range           | 0.2         |
|    entropy_loss         | -1.09       |
|    explained_variance   | 0.0131      |
|    learning_rate        | 0.0003      |
|    loss                 | 8.61        |
|    n_updates            | 10          |
|    policy_gradient_loss | -0.0115     |
|    value_loss           | 15.8        |
-----------------------------------------
-----------------

<stable_baselines3.ppo.ppo.PPO at 0x1cf0d63a550>

In [618]:
from stable_baselines3.common.evaluation import evaluate_policy
evaluate_policy(model, env, n_eval_episodes=10, render=False)
env.close()

# Test Model

In [21]:
import math
import random
import time
env = BallBalancerEnv()

episodes = 10
for episode in range(1, episodes+1):
    obs = env.reset()
    score = 0
    
    while True:
        action, _states = model.predict(obs)
        obs, rewards, done, info = env.step(action)
        score += reward
        env.render()
        time.sleep(0.05)
        if done: 
            print('Episode:{} Score:{}'.format(episode, score))
            break
        
env.close()

Episode:1 Score:-50
Episode:2 Score:-50
Episode:3 Score:-50
Episode:4 Score:-24
Episode:5 Score:-50
Episode:6 Score:-50
Episode:7 Score:-44
Episode:8 Score:-50
Episode:9 Score:-37
Episode:10 Score:-50
