In [49]:
# !python -m pip install mlagents==0.28.0
# !python -m pip install gym
# !cd "/Users/aditya/Documents/GitHub/game_creation_research/ml-agents/gym-unity" && pip3 install -e .

import copy
import json
import numpy as np
from mlagents_envs.environment import UnityEnvironment
from collections import namedtuple
from mlagents_envs.side_channel.engine_configuration_channel import EngineConfigurationChannel
from gym_unity.envs import UnityToGymWrapper

env = None


In [57]:
from typing import List

Vector2 = namedtuple('Vector2', 'x y')

objectOrder = ["bucket", "corner", "crate", "gear", "triangle"]
colliderOrder = ["bottomWall", "bucket", "corner", "crate", "gear",
                 "leftWall", "pedestal", "rightWall", "topWall", "triangle"]


class Obs():
    def __init__(self, raw_obs):
        """
        Converts Unity Agent outputted Vector Observation to 
        named format
        """
        self.raw_obs = raw_obs
        self.objPos = {}
        self.objPos[objectOrder[0]] = Vector2(raw_obs[0], raw_obs[1])
        self.objPos[objectOrder[1]] = Vector2(raw_obs[2], raw_obs[3])
        self.objPos[objectOrder[2]] = Vector2(raw_obs[4], raw_obs[5])
        self.objPos[objectOrder[3]] = Vector2(raw_obs[6], raw_obs[7])
        self.objPos[objectOrder[4]] = Vector2(raw_obs[8], raw_obs[9])
        self.ballPos = Vector2(raw_obs[10], raw_obs[11])
        self.ballVel = Vector2(raw_obs[12], raw_obs[13])
        self.rawColliderVal = raw_obs[14]
        self.colliderIdx = int(self.rawColliderVal)
        if self.colliderIdx >= 0:
            self.collidedWith = colliderOrder[self.colliderIdx]
        else:
            self.collidedWith = None
        self.reset = bool(raw_obs[15])

    def __str__(self) -> str:
        """
        Pretty Print Observation
        """
        s = ""
        for each_obj in self.objPos:
            s+= f"{each_obj}: {self.objPos[each_obj]}\n"
        s += f"Ball Position: {self.ballPos}\n"
        s += f"Ball Velocity: {self.ballVel}\n"
        s += f"Collided With: {self.collidedWith}\n"
        s += f"In Reset?: {self.reset}\n"
        return s
    
    def __repr__(self):
        return str(self.toArray())

    def toArray(self):
        return self.raw_obs


class Action():
    def __init__(self, raw_action=[0, 0, 0, 0, 0, 0], force = False):

        self.raw_action = raw_action
        self.mouseX = raw_action[0]
        self.mouseY = raw_action[1]
        self.objX = raw_action[2]
        self.objY = raw_action[3]
        self.rawObjVal = raw_action[4]
        self.objIdx = self.mapActionValToDiscreteIdx(self.rawObjVal)
        if self.objIdx >= 0:
            self.objName = objectOrder[self.objIdx]
        else:
            self.objName = None
        self.reset = bool(raw_action[5])
        self.force = force
    
    def __str__(self):
        return str([self.mouseX, self.mouseY, self.objX, self.objY, self.objName, self.reset])
    
    def __repr__(self):
        return str(self.toArray())
    
    def isEmpty(self):
        return sum(self.toArray(raw=False))==0.0

    def toArray(self, raw=True):
        if raw:
            return [self.mouseX, self.mouseY, self.objX, self.objY, self.rawObjVal, int(self.reset)]
        else:
            return [self.mouseX, self.mouseY, self.objX, self.objY, self.objIdx, int(self.reset)]
    
    def setObject(self, name):
        self.rawObjVal = self.objectTagToActionVal(name)
        self.objIdx = self.objectTagToActionVal(name)
        self.objName = name

    @staticmethod
    def mapActionValToDiscreteIdx(value):
        assert -1 <= value <= 1
        value = abs(value)
        value *= 5.49
        value = round(value)
        value = value - 1
        return value
    
    @staticmethod
    def objectTagToActionVal(object):
        idx = objectOrder.index(object)
        # 0 is abstain
        idx = idx + 1
        return idx/5.49



class ActionTransformer():
    def __init__(self, ban_object=[], ban_mouse_position_x=(99, 999), ban_mouse_position_y=(99, 999),
                 ban_object_position_x=(99, 999), ban_object_position_y=(99, 999)):
                 
        self.ban_object = ban_object
        assert ban_mouse_position_x[1] > ban_mouse_position_x[0]
        self.ban_mouse_position_x = ban_mouse_position_x
        assert ban_mouse_position_y[1] > ban_mouse_position_y[0]
        self.ban_mouse_position_y = ban_mouse_position_y
        assert ban_object_position_x[1] > ban_object_position_x[0]
        self.ban_object_position_x = ban_object_position_x
        assert ban_object_position_y[1] > ban_object_position_y[0]
        self.ban_object_position_y = ban_object_position_y

    def transform(self, action: Action):
        if action.force:
            return action
        else:
            if action.objName in self.ban_object:
                return Action()
            if self.ban_mouse_position_x[0] <= action.mouseX <= self.ban_mouse_position_x[1]:
                return Action()
            if self.ban_mouse_position_y[0] <= action.mouseY <= self.ban_mouse_position_y[1]:
                return Action()
            if self.ban_object_position_x[0] <= action.objX <= self.ban_object_position_x[1]:
                return Action()
            if self.ban_object_position_y[0] <= action.objY <= self.ban_object_position_y[1]:
                return Action()
        return action

    def __repr__(self):
        return json.dumps(vars(self))
    
    def __str__(self):
        return self.__repr__()

class PlaceAndShootGym(UnityToGymWrapper):
    def __init__(self, gym_env, reward_fn, actionTransformer=ActionTransformer(), announce_actions = True):
        self.gym_env = gym_env
        self.reward_fn = reward_fn
        self.actionTransformer = actionTransformer
        # unsure if this is always true
        self.velTresh = 0.001
        self.lastObsVec = None
        self.announce_actions = announce_actions

    def step(self, action, allow_empty = True, quiet = False):
        """
        Step is defined as doing something ball has stopped
        """
        if type(action) != Action:
            action = Action(action)

        action = self.actionTransformer.transform(action)
        if action.isEmpty() and not allow_empty:
            return (None, None, None, None)

        obsVec = []

        # first step
        if self.announce_actions and not quiet:
            print(action)
        raw_obs, _reward, done, info = self.gym_env.step(action.toArray())
        obsVec.append(Obs(raw_obs))

        # continued steps
        while (any([abs(f) > self.velTresh for f in obsVec[-1].ballVel])):
            raw_obs, _reward, done, info = self.gym_env.step(action.toArray())
            obsVec.append(Obs(raw_obs))
        reward = self.getRewards(obsVec)

        self.lastObsVec = obsVec
        return (obsVec[-1].toArray(), reward, done, info)

    def setup(self, actionVec, checkWithTransformer=False) -> bool:
        """
        Setup steps must be a sequence of actions that end with a reset of the ball
        """
        assert actionVec[-1][-1] == 1
        for each_raw_action in actionVec:
            if checkWithTransformer:
                each_action = self.actionTransformer.transform(Action(each_raw_action))
            else:
                each_action = Action(each_raw_action)
            self.gym_env.step(each_action.toArray())

    def getRewards(self, obsVec: List[Obs]) -> float:
        return float(self.reward_fn(obsVec))

    def reset(self):
        self.gym_env.reset()

    def close(self):
        self.gym_env.close()


In [58]:
# GAME1 = Shoot into bucket through many different gaps for variable rewards
# GAME2 = Shoot into bucket after colliding with crate once
# GAME3 = Shoot and land onto a platform on the top right made of corner and crate
# GAME4 = shoot and touch as many objects before it touches the floor and playable if more than 6 points
# GAME5 = free the ball and balance on gear until it goes into bucket (challenge scenario)


In [59]:
# crate in the middle and bucket on floor for bounce and bucket game
GAME_2_SETUP = [[0, 0, 0, 0.8, Action.objectTagToActionVal("crate"), 0],
                [0, 0, -0.85, -0.85, Action.objectTagToActionVal("bucket"), 0],
                [0, 0, 0, 0, 0, 1]]


def endsInBucket(obsVec: List[Obs]) -> bool:
    """
    Custom Reward Fn:
    Is that ball in bucket at the end or no?
    """
    MIN_X_DELTA = -0.1927506923675537
    MAX_X_DELTA = 0.2523689270019531
    MIN_Y_DELTA = -0.24334418773651123
    MAX_Y_DELTA = 0.6142134666442871

    ball_x, ball_y = obsVec[-1].ballPos
    bucket_x, bucket_y = obsVec[-1].objPos["bucket"]
    x_delta = ball_x - bucket_x
    y_delta = ball_y - bucket_y

    return (MAX_X_DELTA >= x_delta >= MIN_X_DELTA) and (MAX_Y_DELTA >= y_delta >= MIN_Y_DELTA)


def GAME_2_REWARD(obsVec: List[Obs]) -> bool:
    """
    Custom Reward Fn:
    hits crate and goes in bucket
    """
    hitCrate = False
    for each_obs in obsVec:
        if each_obs.collidedWith=="crate":
            hitCrate = True
            break
    return hitCrate and endsInBucket(obsVec)


NO_OBJECT_INTERACTION = ActionTransformer(
    ban_object=["crate", "bucket", "corner", "gear", "triangle"])

# no setting up on bucket!
GAME_2_TRANSFORMER = copy.deepcopy(NO_OBJECT_INTERACTION)
GAME_2_TRANSFORMER.ban_mouse_position_x = (-1, -0.37)

In [60]:
GAME_2_TRANSFORMER

{"ban_object": ["crate", "bucket", "corner", "gear", "triangle"], "ban_mouse_position_x": [-1, -0.37], "ban_mouse_position_y": [99, 999], "ban_object_position_x": [99, 999], "ban_object_position_y": [99, 999]}

In [65]:
SERVER_BUILD = "../Builds/MLAgent_View_21April22_server.app"
GRAPHICAL_BUILD = "../Builds/Experimenter_View_25April22.app"
GYM_BUILD = "../Builds/Gym_View_25April22.app"

# if env:
#     env.close()
    
# channel = EngineConfigurationChannel()
# channel.set_configuration_parameters(time_scale = 1, quality_level=0)
# unity_env = UnityEnvironment(file_name=GYM_BUILD, seed=1, side_channels=[channel], worker_id=1)

unity_env = UnityEnvironment()

# Start interacting with the environment.
unity_env.reset()
gym_env = UnityToGymWrapper(unity_env, allow_multiple_obs=False)
env = PlaceAndShootGym(gym_env, reward_fn=GAME_2_REWARD,
                    #    actionTransformer=GAME_2_TRANSFORMER,
                       announce_actions=True)


[INFO] Listening on port 5004. Start training by pressing the Play button in the Unity Editor.
[INFO] Connected to Unity environment with package version 2.2.1-exp.1 and communication version 1.5.0
[INFO] Connected new brain: PlaceAndShoot?team=0


In [66]:
env.setup(GAME_2_SETUP)

In [8]:
# reset pedestal 

a = Action()
a.reset = True
print(Obs(env.step(a)[0]))

[0, 0, 0, 0, None, True]
bucket: Vector2(x=4.5, y=4.5)
corner: Vector2(x=6.75, y=-3.5)
crate: Vector2(x=4.5, y=4.5)
gear: Vector2(x=6.75, y=-0.65)
triangle: Vector2(x=6.75, y=-2.0)
Ball Position: Vector2(x=0.0, y=-3.9)
Ball Velocity: Vector2(x=0.0, y=-0.0)
Collided With: None
In Reset?: True



In [9]:
test_action = Action([0.1, 0.1, 0.1, 0.1, 0.1, 0])
print(test_action)
env.step(test_action)

[0.1, 0.1, 0.1, 0.1, 'bucket', False]
[0.1, 0.1, 0.1, 0.1, 'bucket', False]


UnityCommunicatorStoppedException: Communicator has exited.

In [69]:
# see exactly what just happened

for each_obs in env.lastObsVec:
    print(each_obs)

bucket: Vector2(x=-4.0499997, y=-4.0499997)
corner: Vector2(x=6.75, y=-3.5)
crate: Vector2(x=1.35, y=0.0)
gear: Vector2(x=6.75, y=-0.65)
triangle: Vector2(x=6.75, y=-2.0)
Ball Position: Vector2(x=0.43389353, y=1.8268149)
Ball Velocity: Vector2(x=-1.4331077, y=11.456368)
Collided With: crate
In Reset?: False

bucket: Vector2(x=-4.0499997, y=-4.0499997)
corner: Vector2(x=6.75, y=-3.5)
crate: Vector2(x=1.35, y=0.0)
gear: Vector2(x=6.75, y=-0.65)
triangle: Vector2(x=6.75, y=-2.0)
Ball Position: Vector2(x=-0.35009938, y=3.7482615)
Ball Velocity: Vector2(x=-2.9889724, y=-5.6261206)
Collided With: topWall
In Reset?: False

bucket: Vector2(x=-4.0499997, y=-4.0499997)
corner: Vector2(x=6.75, y=-3.5)
crate: Vector2(x=1.35, y=0.0)
gear: Vector2(x=6.75, y=-0.65)
triangle: Vector2(x=6.75, y=-2.0)
Ball Position: Vector2(x=-1.5393689, y=0.72469294)
Ball Velocity: Vector2(x=-2.8718896, y=-9.248529)
Collided With: None
In Reset?: False

bucket: Vector2(x=-4.0499997, y=-4.0499997)
corner: Vector2(x=6.75

In [68]:
action = Action()
action.mouseX = -0.1
action.mouseY = -0.1
action.objX = 0.3
action.objY = 0
action.setObject("crate")
# action.reset = True
env.step(action)

[-0.1, -0.1, 0.3, 0, 'crate', False]


(array([-4.0499997e+00, -4.0499997e+00,  6.7500000e+00, -3.5000000e+00,
         1.3500000e+00,  0.0000000e+00,  6.7500000e+00, -6.4999998e-01,
         6.7500000e+00, -2.0000000e+00, -4.1138101e+00, -4.2552295e+00,
        -1.9895008e-10,  9.1688124e-10,  1.0000000e+00,  0.0000000e+00],
       dtype=float32),
 1.0,
 False,
 {'step': <mlagents_envs.base_env.DecisionSteps at 0x7fedd9f72df0>})

In [71]:
endsInBucket(env.lastObsVec)

True

In [None]:
# search over movements

def isPlayable():
    for shoot_mouse_x in np.arange(-1.0, 1.0, 0.5):
        for shoot_mouse_y in np.arange(-1.0, 1.0, 0.5):
            for place_mouse_x in np.arange(-1.0, 1.0, 0.5):
                # reshoot
                env.step([0, 0, 0, 0, 0, 1], quiet=True)

                # place pedestal
                place_action = Action([place_mouse_x, 0, 0, 0, 0, 0])
                print("Place:")
                raw_obs, reward, _done, _info = env.step(place_action, allow_empty=True)

                # shoot ball
                shoot_action = Action([shoot_mouse_x, shoot_mouse_y, 0, 0, 0, 0], force=True)
                print("Shoot:")
                # force True because shooting should be allowed to utilize those banned float values
                raw_obs, reward, _done, _info = env.step(shoot_action, allow_empty=False)

                if reward:
                    if reward>0:
                        print(f"Game is playable!")
                        return

isPlayable()


Place:
[0, 0, 0, 0, None, False]
Shoot:
[-1.0, -1.0, 0, 0, None, False]


KeyboardInterrupt: 

In [64]:
env.close()

In [None]:
# env.reset()