<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"></ul></div>

In [2]:
#https://stable-baselines.readthedocs.io/en/master/guide/custom_env.html
#https://github.com/koulanurag/ma-gym/blob/master/ma_gym/envs/pong_duel/pong_duel.py

import copy
import logging

import gym
import numpy as np
from gym import spaces
from gym.utils import seeding

from gym.utils.action_space import MultiAgentActionSpace
from gym.utils.observation_space import MultiAgentObservationSpace
from gym.utils.draw import draw_grid, fill_cell, draw_border

logger = logging.getLogger(__name__)

import random
import time
# import ball
import player
# import guard
# import wing
# import big

ModuleNotFoundError: No module named 'gym.utils.action_space'

In [62]:
class BasketballEnv(gym.Env):
    """
    Custom Environment that follows gym interface.
    This is a simple env where multiple agents learn strategies to put the ball in the hoop.
    For this simple iteration, actions will be determined by probabilities rather than physics.
    """
    # In google colab, we cannot implement the GUI ('human' render mode)
    metadata = {'render.modes': ['human']}

    
    def __init__(self, step_cost=0, reward=1, max_rounds=10):
        #Grid size will be standard basketball halfcourt at 6"=1'-0" scale
        self._grid_shape = (100, 94)

        #Number of players
        self.n_agents = 6
        self.n_agents_team_A = 3
        self.n_agents_team_B = 3
        self.reward = reward
        self._max_rounds = max_rounds
        self.action_space = spaces.MultiDiscrete([9, 3, 1, 1])

        self._step_count = None
        self._step_cost = step_cost
        self._total_episode_reward = None
        self.agent_pos = {_: None for _ in range(self.n_agents)}
        self._agent_dones = None
        self.__rounds = None

        # agent pos(6)=
        self._obs_low = np.array([0., 0., 0., 0., 0., 0.] + [0.])
        self._obs_high = np.array([1., 1., 1., 1., 1., 1.] + [1.])
        self.observation_space = spaces.Box(low=0, high=(100),
                                        shape=(100,94), dtype=np.float32)

        self.viewer = None
        self.seed()

    def get_action_meanings(self, agent_i=None):
        if agent_i is not None:
            assert agent_i <= self.n_agents
            return [ACTION_MEANING[i] for i in range(self.action_space[agent_i].n)]
        else:
            return [[ACTION_MEANING[i] for i in range(ac.n)] for ac in self.action_space]

    def __create_grid(self):
        _grid = [[PRE_IDS['empty'] for _ in range(self._grid_shape[1])] for row in range(self._grid_shape[0])]
        return _grid

    def __update_agent_view(self, agent_i):
        for row in range(self.agent_prev_pos[agent_i][0],
                         self.agent_prev_pos[agent_i][0]):
            self._full_obs[row][self.agent_prev_pos[agent_i][1]] = PRE_IDS['empty']

        for row in range(self.agent_pos[agent_i][0], self.agent_pos[agent_i][0]):
            self._full_obs[row][self.agent_pos[agent_i][1]] = PRE_IDS['agent'] + str(agent_i + 1) \
                                                              + '_' + str(row - self.agent_pos[agent_i][0])

    def __draw_base_img(self):
        self._base_img = draw_grid(self._grid_shape[0], self._grid_shape[1],
                                   cell_size=CELL_SIZE, fill='white', line_color='white')

    def __init_full_obs(self):
        self._full_obs = self.__create_grid()
        for agent_i in range(self.n_agents):
            self.__update_agent_view(agent_i)

        for agent_i in range(self.n_agents):
            self.__update_agent_view(agent_i)

        self.__update_ball_view()

        self.__draw_base_img()

    def get_agent_obs(self):
        _obs = []

        for agent_i in range(self.n_agents):
            pos = self.agent_pos[agent_i]
            _agent_i_obs = [pos[0] / self._grid_shape[0], pos[1] / self._grid_shape[1]]

            _agent_i_obs += [pos[0] / self._grid_shape[0], pos[1] / self._grid_shape[1]]

            _obs.append(_agent_i_obs)

        return _obs
    
##############
#Define Reset#   
##############

    def reset(self):
        self.__rounds = 0
        self.agent_pos[0] = (self.np_random.randint(PADDLE_SIZE, self._grid_shape[0] - PADDLE_SIZE - 1), 1)
        self.agent_pos[1] = (self.np_random.randint(PADDLE_SIZE, self._grid_shape[0] - PADDLE_SIZE - 1),
                             self._grid_shape[1] - 2)
        self.agent_prev_pos = {_: self.agent_pos[_] for _ in range(self.n_agents)}
        self.__init_ball_pos()
        self._agent_dones = [False, False]
        self.__init_full_obs()
        self._step_count = 0
        self._total_episode_reward = [0 for _ in range(self.n_agents)]

        return self.get_agent_obs()

    @property
    def __has_ball(self):
        return False
            
    def action_success(self, p_1):
        return np.random.choice([0, 1], p=[1 - p_1, p_1])
    
    def close_range(self):
        if np.sqrt(x.self**2 + y.self**2) + np.sqrt(10.5**2+50**2) <= 6:
            return True
    
    def midrange(self):
        if close_range == False and three_point_range == False:
            return True
    
    def three_point_range(self):
        if self.y <= 19.67 and self.x <= 6.67 or self.x >= 93.33:
            return True
        elif self.y > 19.67 and np.sqrt(self.y**2 + self.x**2) + np.sqrt(10.5**2+50**2) > 44.3 + np.sqrt(10.5**2+50**2):
            return True
        
    #define defensive rebound, will be a reward for the defensive team
    def d_rebound():
        return False
        
    #define shot, a made shot will be a reward for the offensive team
    def shot(self):
        
        #Can only shoot if the player has the ball
        if _has_ball():
            
            #Close range shot
            if close_range():
                shot = action_success(player.shooting_close)
                if shot == 1:
                    return True
                else:
                    miss = action_success(0.3)
                    if miss == 1:
                        rebounder = random.choice([agent_i in n_agents_team_A])
                        rebounder._has_ball = True
                        return rebounder._has_ball()
                    else:
                        d_rebound = True
                        return d_rebound()
                                                
            #Midrange shot
            if midrange():
                shot = action_success(player.shooting_midrange)
                if shot == 1:
                    return True
                else:
                    miss = action_success(0.3)
                    if miss == 1:
                        rebounder = random.choice([agent_i in n_agents_team_A])
                        rebounder._has_ball = True
                        return rebounder._has_ball()
                    else:
                        d_rebound = True
                        return d_rebound()
                
                    
            #3 point shot
            if three_point_range():
                if action_success(player.shooting3pts):
                    return True
                else:
                    miss = action_success(0.3)
                    if miss == 1:
                        rebounder = random.choice([agent_i in n_agents_team_A])
                        rebounder._has_ball = True
                        return rebounder._has_ball()
                    else:
                        d_rebound = True
                        return d_rebound()
            
    
    def ball_pass_to_0(self):
        for agent_i in n_agents_team_A:
            if agent_i._has_ball():
                agent_0._has_ball = True
                agent_i._has_ball = False
                
    def ball_pass_to_1(self):
        for agent_i in n_agents_team_A:
            if agent_i._has_ball():
                agent_1._has_ball = True
                agent_i._has_ball = False

    def ball_pass_to_2(self):
        for agent_i in n_agents_team_A:
            if agent_i._has_ball():
                agent_2._has_ball = True
                agent_i._has_ball = False
    
    def steal(self):
        for agent_i in n_agents_team_B:
            for agent_j in n_agents_team_A:
                if agent_j._has_ball and np.sqrt(agent_i.self.x**2 + agent_i.self.y**2) < 5:
                    return action_success(0.02)
    
    def block(self):
        for agent_i in n_agents_team_B:
            for agent_j in n_agents_team_A:
                if agent_j.shot() and np.sqrt(agent_i.self.x**2-agent_j.self.x**2 + agent_i.self.y**2-agent_j.self.y**2) < 4:
                    return action_success(0.02)
    
###############
#Define Render#   
###############

    def render(self, mode='human'):
        img = copy.copy(self._base_img)
        for agent_i in range(self.n_agents):
            for row in range(self.agent_pos[agent_i][0] - 2, self.agent_pos[agent_i][0] + 3):
                fill_cell(img, (row, self.agent_pos[agent_i][1]), cell_size=CELL_SIZE, fill=AGENT_COLORS[agent_i])

        ball_cells = self.__ball_cells
        fill_cell(img, ball_cells[0], cell_size=CELL_SIZE, fill=BALL_HEAD_COLOR)
        fill_cell(img, ball_cells[1], cell_size=CELL_SIZE, fill=BALL_TAIL_COLOR)
        fill_cell(img, ball_cells[2], cell_size=CELL_SIZE, fill=BALL_TAIL_COLOR)

        img = draw_border(img, border_width=2, fill='gray')

        img = np.asarray(img)
        if mode == 'rgb_array':
            return img
        elif mode == 'human':
            from gym.envs.classic_control import rendering
            if self.viewer is None:
                self.viewer = rendering.SimpleImageViewer()
            self.viewer.imshow(img)
            return self.viewer.isopen

    def __update_agent_pos(self, agent_i, move):

        curr_pos = copy.copy(self.agent_pos[agent_i])
        if move == 0:  # noop
            next_pos = None
        elif move == 1:  # up
            next_pos = [curr_pos[0] - 1, curr_pos[1]]
        elif move == 2:  # upright
            next_pos = [curr_pos[0] - 1, curr_pos[1] + 1]
        elif move == 3:  # right
            next_pos = [curr_pos[0], curr_pos[1] + 1]
        elif move == 4:  # downright
            next_pos = [curr_pos[0] + 1, curr_pos[1] + 1]
        elif move == 5:  # down
            next_pos = [curr_pos[0] + 1, curr_pos[1]]
        elif move == 6:  # downleft
            next_pos = [curr_pos[0] + 1, curr_pos[1] - 1]
        elif move == 7:  # left
            next_pos = [curr_pos[0], curr_pos[1] - 1]
        elif move == 8:  # upleft
            next_pos = [curr_pos[0] - 1, curr_pos[1] - 1]
        else:
            raise Exception('Action Not found!')

        if next_pos is not None and PADDLE_SIZE <= next_pos[0] <= (self._grid_shape[0] - PADDLE_SIZE - 1):
            self.agent_prev_pos[agent_i] = self.agent_pos[agent_i]
            self.agent_pos[agent_i] = next_pos
            self.__update_agent_view(agent_i)
#############
#Define Seed#   
#############

    def seed(self, n=None):
        self.np_random, seed = seeding.np_random(n)
        return [seed]

#############
#Define Step#   
#############
    
    def step(self, action_n):
        assert len(action_n) == self.n_agents
        self._step_count += 1
        rewards = [self._step_cost for _ in range(self.n_agents)]

        # if ball is beyond paddle, initiate a new round
        if self.ball_pos[1] < 1:
            rewards = [0, self.reward]
            self.__rounds += 1
        elif self.ball_pos[1] >= (self._grid_shape[1] - 1):
            rewards = [self.reward, 0]
            self.__rounds += 1

        if self.__rounds == self._max_rounds:
            self._agent_dones = [True for _ in range(self.n_agents)]
        else:
            for agent_i in range(self.n_agents):
                self.__update_agent_pos(agent_i, action_n[agent_i])

            if (self.ball_pos[1] < 1) or (self.ball_pos[1] >= self._grid_shape[1] - 1):
                self.__init_ball_pos()
            else:
                self.__update_ball_pos()

        for i in range(self.n_agents):
            self._total_episode_reward[i] += rewards[i]

        return self.get_agent_obs(), rewards, self._agent_dones, {'rounds': self.__rounds}

# Define constants for clearer code

CELL_SIZE = 5

#Goal Location
GOAL = [10.5, 50]

ACTION_MEANING = {
    0 : 'NOOP',
    1 : 'UP',
    2 : 'UPRIGHT',
    3 : 'RIGHT',
    4 : 'DOWNRIGHT',
    5 : 'DOWN',
    6 : 'DOWNLEFT',
    7 : 'LEFT',
    8 : 'UPLEFT',
    9 : 'PASS_0',
    10 : 'PASS_1',
    11 : 'PASS_2',
    12 : 'SHOOT',
    13 : 'STEAL',
    14 : 'BLOCK',
}

AGENT_TEAMS = {
    0: 'A',
    1: 'A',
    2: 'A',
    3: 'B',
    4: 'B',
    5: 'B'
    
}

AGENT_COLORS = {
    0: 'red',
    1: 'red',
    2: 'red',
    3: 'blue',
    4: 'blue',
    5: 'blue'
}
WALL_COLOR = 'black'
BALL_HEAD_COLOR = 'orange'
BALL_TAIL_COLOR = 'yellow'

# each pre-id should be unique and single char
PRE_IDS = {
    'agent': 'A',
    'goal' : 'G',
    'empty': 'O'
}


In [33]:
from stable_baselines.common.env_checker import check_env
import numpy as np
import gym
from gym import spaces


In [63]:
env = BasketballEnv()
# It will check your custom environment and output additional warnings if needed
check_env(env)



NameError: name 'PADDLE_SIZE' is not defined

In [None]:
        
        
        
    def _get_obs(self, obs):
        """
        Concatenate the time feature to the current observation.
        :param obs: (np.ndarray)
        :return: (np.ndarray)
        """
        # Remaining time is more general
        time_feature = 1 - (self._current_step / self._max_steps)
        if self._test_mode:
            time_feature = 1.0
        # Optionnaly: concatenate [time_feature, time_feature ** 2]
    return np.concatenate((obs, [time_feature]))

    def reset(self):
    """
    Important: the observation must be a numpy array
    :return: (np.array) 
    """
    # Initialize the agent at the right of the grid
        self.agent_pos = self.grid_size - 1
   
    # here we convert to float32 to make it more general (in case we want to use continuous actions)
    return np.array([self.agent_pos]).astype(np.float32)

    def step(self, action):
        if action == self.LEFT:
            self.agent_pos -= 1
        elif action == self.RIGHT:
            self.agent_pos += 1
        else:
            raise ValueError("Received invalid action={} which is not part of the action space".format(action))

    # Account for the boundaries of the grid
    self.agent_pos = np.clip(self.agent_pos, 0, self.grid_size)

    # Are we at the left of the grid?
    done = bool(self.agent_pos == 0)

    # Null reward everywhere except when reaching the goal (left of the grid)
    reward = 1 if self.agent_pos == 0 else 0

    # Optionally we can pass additional info, we are not using that for now
    info = {}

    return np.array([self.agent_pos]).astype(np.float32), reward, done, info

    def render(self, mode='console'):
        if mode != 'console':
            raise NotImplementedError()
            # agent is represented as a cross, rest as a dot
            print("." * self.agent_pos, end="")
            print("x", end="")
            print("." * (self.grid_size - self.agent_pos))

    def close(self):
        pass
    


In [None]:
#Lots of help from Dustin
class Environment:
    SIZE = [50, 94]
    GOAL_A = [10.5, 50]
    BACKBOARD_A = [25, 4]
    BACKBOARD_B = [25, 90]
    GOAL_B = [25, 88.75]
    3PT_LINE = [Coods] 
    SCORE = 0
    
    def shot(self):
        if self.x ** 2 + self.y**2 > AMOUNT:
            
    
    def __init__(self):
        self.time_left = countdown(300)

        self.x = 44
        self.y = 25
        
    #Countdown timer as game clock
    #https://www.geeksforgeeks.org/how-to-create-a-countdown-timer-using-python/
    def countdown(t=300):     
        while t: 
            mins, secs = divmod(t, 60) 
            timer = '{:02d}:{:02d}'.format(mins, secs)  
            time.sleep(1) 
            t -= 1
            return t 

    def num_states(self):
        return t

    def num_actions(self):
        return 10

    def get_observation(self):
        return [self.x, self.y]

    def get_state_num(self):
        return self.x*self.SIZE + self.y

    def get_pos_from_state_num(self, state_num):
        return (state_num // self.SIZE, state_num % self.SIZE)

    def has_ball(self):
        if self.x == ball.x and self.y == ball.y:
            return True
        return False
    
    def has_dribble:
        
    def on_offense(self):
        
        
        
    def get_actions(self):
        if has_ball == False:
            return ["up", "up-right", "right", "down-right", "down", "down-left", "left", "up-left", "jump", "screen"]
        if has_ball == True and has_dribble == True:
            return ['pass', 'shoot', 'up', 'down', 'left', 'right']

    def is_done(self):
        return self.time_left == 0

    def at_goal(self):
        return self.x == self.GOAL[0] and self.y == self.GOAL[1]

    def is_clear(self, x, y):
        for w in self.WALLS:
            if x == w[0] and y == w[1]:
            return False
        return True

    def action(self, action):
        if self.is_done():
            raise Exception("Episode is already over")
        self.steps_left -= 1
        if action == "up" and self.y > 0:
            if self.is_clear(self.x, self.y-1):
            self.y -= 1
        elif action == "down" and self.y < self.SIZE-1:
            if self.is_clear(self.x, self.y+1):
            self.y += 1
        elif action == "left" and self.x > 0:
            if self.is_clear(self.x-1, self.y):
            self.x -= 1
        elif action == "right" and self.x < self.SIZE-1:
            if self.is_clear(self.x+1, self.y):
            self.x += 1

        if self.x == self.GOAL[0] and self.y == self.GOAL[1]:
            return 1.0
        return 0.0