<a href="https://colab.research.google.com/github/dasys-lab/comaze-python/blob/gym-env/CoMazeGym_Agent_Template.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import requests
import time


class CoMazeGym:
  if os.path.isfile(".local"):
    API_URL = "http://localhost:16216"
    WEBAPP_URL = "http://localhost"
  else:
    API_URL = "http://teamwork.vs.uni-kassel.de:16216"
    WEBAPP_URL = "http://teamwork.vs.uni-kassel.de"
  LIB_VERSION = "1.1.0"
  
  def __init__(self):
    self.game = None
    self.game_id = None
    self.player_id = None
    self.action_space = None

  def reset(self, options={}):
    level = options.get("level", "1")
    num_of_player_slots = options.get("num_of_player_slots", "2")
    self.game_id = requests.post(self.API_URL + "/game/create?level=" + level + "&numOfPlayerSlots=" + num_of_player_slots).json()["uuid"]
    options["game_id"] = self.game_id
    return self.play_existing_game(options)

  def play_existing_game(self, options={}):
    if "look_for_player_name" in options:
      options["game_id"] = requests.get(self.API_URL + "/game/byPlayerName?playerName=" + options["look_for_player_name"]).json()["uuid"]

    if "game_id" not in options or len(options["game_id"]) != 36:
      raise Exception("You must provide a game id when attending an existing game. Use play_new_game() instead of play_existing_game() if you want to create a new game.")

    player_name = options.get("player_name", "Python")
    self.game_id = options["game_id"]
    print("Joined gameId: " + self.game_id)
    player = requests.post(self.API_URL + "/game/" + self.game_id + "/attend?playerName=" + player_name).json()
    self.player_id = player["uuid"]
    self.action_space = player['directions'] + ['SKIP']
    print("Playing as playerId: " + self.player_id)
    self.game = requests.get(self.API_URL + "/game/" + self.game_id).json()
    print(f'Action Space is {self.action_space}')

    while self.game['currentPlayer']['uuid'] != self.player_id:
      print(f'Waiting for other player to make first move')
      time.sleep(1)
      self.game = requests.get(self.API_URL + "/game/" + self.game_id).json()

    return self.game

  def step(self, action, message=None):
    moved = False
    while not moved:
      self.game = requests.get(self.API_URL + "/game/" + self.game_id).json()

      if not self.game["state"]["started"]:
        print("Waiting for players. (Invite someone: " + self.WEBAPP_URL + "/?gameId=" + self.game_id + ")")
        time.sleep(3)
        continue

      print("Moving " + action)
      print(f'Sending message {message}')
      print('---')
      self.game = requests.post(self.API_URL + "/game/" + self.game_id + "/move?playerId=" + self.player_id + "&action=" + action).json()
      moved = True
    
    if self.game["state"]["won"]:
      print("Game won!")
      reward = 1
    elif self.game["state"]["lost"]:
      print("Game lost (" + self.game["state"]["lostMessage"] + ").")
      reward = -1
    else:
      reward = 0

    if not self.game["state"]["over"]:
      # wait for other player to make a move before sending back obs
      while self.game['currentPlayer']['uuid'] != self.player_id:
        print(f'Waiting for other player to make a move')
        time.sleep(1)
        self.game = requests.get(self.API_URL + "/game/" + self.game_id).json()

    return self.game, reward, self.game["state"]["over"], None
    

In [None]:
env = CoMazeGym()
env.reset()

Joined gameId: 93ac1aa9-2b7d-4798-be94-5a46dfb69970
Playing as playerId: 540c52c4-5a20-463a-a78b-47f63399a037
Action Space is ['RIGHT', 'DOWN', 'SKIP']


{'agentPosition': {'x': 3, 'y': 3},
 'config': {'agentStartPosition': {'x': 3, 'y': 3},
  'arenaSize': {'x': 7, 'y': 7},
  'bonusTimes': [],
  'goals': [{'color': 'YELLOW', 'position': {'x': 5, 'y': 1}},
   {'color': 'GREEN', 'position': {'x': 1, 'y': 5}},
   {'color': 'RED', 'position': {'x': 1, 'y': 1}},
   {'color': 'BLUE', 'position': {'x': 5, 'y': 5}}],
  'hasSecretGoalRules': False,
  'initialMaxMoves': None,
  'symbolMessages': ['E', 'T', 'Y', 'U', 'O', 'I', 'Q', 'R', 'W', 'P'],
  'walls': []},
 'currentPlayer': {'actions': ['DOWN', 'RIGHT', 'SKIP'],
  'directions': ['RIGHT', 'DOWN'],
  'lastAction': None,
  'lastSymbolMessage': None,
  'name': 'Python',
  'uuid': '540c52c4-5a20-463a-a78b-47f63399a037'},
 'maxMoves': None,
 'mayStillMove': True,
 'movesLeft': None,
 'name': None,
 'numOfPlayerSlots': 2,
 'players': [{'actions': ['DOWN', 'RIGHT', 'SKIP'],
   'directions': ['RIGHT', 'DOWN'],
   'lastAction': None,
   'lastSymbolMessage': None,
   'name': 'Python',
   'uuid': '540c

In [None]:
# Random Agent
import random 

obs = env.reset()
game_over = False
while not game_over:
  obs, reward, game_over, info = env.step(random.choice(env.action_space))

In [None]:
# Nearest Goal Agent
# Choose a nearest goal, see if one of your actions can get you there, if so take that action
obs = env.reset()
game_over = False
action_space = env.action_space
goals_pos = [goal['position']
             for goal in obs['config']['goals']]

while not game_over:
  goals_pos = [goal['position'] for goal in obs['unreachedGoals']]
  agent_pos = obs['agentPosition']
  
  goal_diffs = [(goal['x'] - agent_pos['x'], goal['y'] - agent_pos['y'])
                for goal in goals_pos]
  goal_dists = [abs(diff[0])+abs(diff[1]) for diff in goal_diffs]
  nearest_goal = goal_dists.index(min(goal_dists)) 

  print(f'Nearest goal is {obs["unreachedGoals"][nearest_goal]}')
  print(f'Nearest goal diff {goal_diffs[nearest_goal]}')

  move_x, move_y = goal_diffs[nearest_goal]

  if 'LEFT' in action_space and move_x < 0:
    action = 'LEFT'
  elif 'RIGHT' in action_space and move_x > 0:
    action = 'RIGHT'
  elif 'UP' in action_space and move_y < 0:
    action = 'UP'
  elif 'DOWN' in action_space and move_y > 0:
    action = 'DOWN'
  else:
    action = 'SKIP'

  obs, reward, game_over, info = env.step(action)



In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import distributions


In [None]:
# Basic RL agent
# single-layer NN that takes in current state and learns action

ACTION_SPACE = ['LEFT', 'RIGHT', 'UP', 'DOWN', 'SKIP']

class RLAgent(nn.Module):
  def __init__(self, arena_size, num_actions=5):
    super().__init__()
    arena_size_flat = arena_size[0] * arena_size[1]
    self.embed_state = nn.Linear(arena_size_flat,16)
    self.embed_action_space = nn.Linear(5,5)
    self.policy = nn.Linear(21,num_actions)

  def forward(self, state, action_space):
    state_emb = self.embed_state(state)
    action_emb = self.embed_action_space(action_space)
    state_action_emb = torch.cat((state_emb, action_emb), dim=1)
    return self.policy(state_action_emb)


def get_state_tensor(obs):
  arena_size = (obs['config']['arenaSize']['x'], obs['config']['arenaSize']['y'])
  state_tensor = torch.zeros(arena_size).float()
  state_tensor[obs['agentPosition']['x']][obs['agentPosition']['y']] = 1    # agent

  for goal in obs['unreachedGoals']:
    state_tensor[goal['position']['x']][goal['position']['y']] = 2
  
  return state_tensor


def calculate_returns(rewards, discount_factor, normalize = True):
    returns = []
    R = 0
    
    for r in reversed(rewards):
        R = r + R * discount_factor
        returns.insert(0, R)
        
    returns = torch.tensor(returns)
    
    if normalize:
        returns = (returns - returns.mean()) / returns.std()

    return returns


action_space_list = [1 if x in env.action_space else 0 for x in ACTION_SPACE]
action_space_tensor = torch.FloatTensor(action_space_list)
action_space_tensor_batch = action_space_tensor.unsqueeze(0)

discount_factor = 0.9
learning_rate = 1e-2
num_episodes = 1

# arena_size = (obs['arenaSize']['x'], obs['arenaSize']['y'])
arena_size = (7,7)
agent = RLAgent(arena_size)
optimizer = torch.optim.SGD(agent.parameters(), lr=learning_rate)

for ep in range(num_episodes):
  obs = env.reset()

  done = False
  log_prob_actions = []
  rewards = []
  episode_reward = 0

  while not done:
    state_tensor = get_state_tensor(obs)
    state_tensor_batch = torch.flatten(state_tensor).unsqueeze(0)
    action_pred = agent(state_tensor_batch, action_space_tensor_batch)
    
    action_prob = F.softmax(action_pred, dim = -1)  
    avail_action_prob = action_prob * action_space_tensor
    dist = distributions.Categorical(avail_action_prob)
    action = dist.sample()
    log_prob_action = dist.log_prob(action)

    obs, reward, done, _ = env.step(ACTION_SPACE[action.item()])

    log_prob_actions.append(log_prob_action)
    rewards.append(reward)

    episode_reward += reward


  log_prob_actions = torch.cat(log_prob_actions)
  returns = calculate_returns(rewards, discount_factor).detach()
  loss = - (returns * log_prob_actions).sum()

  optimizer.zero_grad()
  loss.backward()
  optimizer.step()

  print(f'Loss {loss} EP reward {episode_reward}')



Joined gameId: f8ce57c7-e2c8-4e32-aa41-d0f65faa5232
Playing as playerId: 1c648cad-be6f-423e-8f44-78d0869194d8
Action Space is ['UP', 'DOWN', 'SKIP']
Waiting for players. (Invite someone: http://teamwork.vs.uni-kassel.de/?gameId=f8ce57c7-e2c8-4e32-aa41-d0f65faa5232)
Waiting for players. (Invite someone: http://teamwork.vs.uni-kassel.de/?gameId=f8ce57c7-e2c8-4e32-aa41-d0f65faa5232)
Waiting for players. (Invite someone: http://teamwork.vs.uni-kassel.de/?gameId=f8ce57c7-e2c8-4e32-aa41-d0f65faa5232)
Moving SKIP
Sending message None
---
Waiting for other player to make a move
Waiting for other player to make a move
Waiting for other player to make a move
Waiting for other player to make a move
Waiting for other player to make a move
Waiting for other player to make a move
Moving SKIP
Sending message None
---
Waiting for other player to make a move
Waiting for other player to make a move
Waiting for other player to make a move
Waiting for other player to make a move
Moving DOWN
Sending messag

JSONDecodeError: ignored