In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Normal
import numpy as np

from mlagents_envs.environment import UnityEnvironment

In [2]:
if(torch.cuda.is_available()):
    device = torch.device("cuda")
    print(device, torch.cuda.get_device_name(0))
else:
    device= torch.device("cpu")
    print(device)

cuda GeForce GTX 1660 SUPER


### Connect to Unity to examine behavior, state and action design 

In [3]:
env = UnityEnvironment(file_name= None, base_port=5004)

In [4]:
# we see only one behavior name at first time
env.reset()
behaviorNames = list(env.behavior_specs.keys())
print(behaviorNames)
for behaviorName in behaviorNames:
    behavior_spec = env.behavior_specs[behaviorName]
    print(behaviorName, behavior_spec)

['Pyramids?team=0']
Pyramids?team=0 BehaviorSpec(observation_shapes=[(56,), (56,), (56,), (4,)], action_spec=ActionSpec(continuous_size=0, discrete_branches=(5,)))


In [5]:
env.close()

### Define and generate NN 

In [6]:
N_STATES  = 172  # 56+56+56+4
N_ACTIONS = 1     # 1 branch with 5 values
N_AGENTS = 3
hidden_units = 512

In [7]:
def init_weights(m):
    if isinstance(m, nn.Linear):
        nn.init.normal_(m.weight, mean=0., std=0.1)
        nn.init.constant_(m.bias, 0.1)

In [8]:
class Net(nn.Module):
    def __init__(self, ):
        super(Net, self).__init__()
        
        self.actor = nn.Sequential(
            nn.Linear(N_STATES, hidden_units),
            nn.LayerNorm(hidden_units),
            nn.Linear(hidden_units, hidden_units),
            nn.LayerNorm(hidden_units),
            nn.Linear(hidden_units, N_ACTIONS)
        )
        self.log_std = nn.Parameter(torch.ones(1, N_ACTIONS) * 0.0)
        self.apply(init_weights)
    
    def forward(self, x):
        mu    = self.actor(x)
        std   = self.log_std.exp().expand_as(mu)
        dist  = Normal(mu, std)
        return dist

In [9]:
net = Net().to(device)

### NN interacts with Unity scene for one step 

In [10]:
env = UnityEnvironment(file_name= None, base_port=5004)

In [11]:
env.reset()
behaviorNames = list(env.behavior_specs.keys())
behaviorName = behaviorNames[0]
print(behaviorName)

Pyramids?team=0


In [12]:
DecisionSteps, TerminalSteps = env.get_steps(behaviorName)
print(DecisionSteps.agent_id, TerminalSteps.agent_id)
for i in range(4):
    print(DecisionSteps.obs[i].shape, end = ", ")

[0 1 2] []
(3, 56), (3, 56), (3, 56), (3, 4), 

In [13]:
# merge vector observatin, perception 
s = torch.FloatTensor(DecisionSteps.obs[0])
for i in range(1, 4):
    s = torch.cat((s, torch.FloatTensor(DecisionSteps.obs[i])), 1)
    print(s.shape)

torch.Size([3, 112])
torch.Size([3, 168])
torch.Size([3, 172])


In [14]:
# send state to NN
dist = net(s.to(device))
print(dist)

Normal(loc: torch.Size([3, 1]), scale: torch.Size([3, 1]))


In [15]:
actions = dist.sample()
print(actions)

tensor([[-2.3393],
        [-0.5947],
        [-5.3377]], device='cuda:0')


#### Convert sampled action values to action indices <br />
action = act[0]  => 0, 1(forward), 2(backward), 3(up), 4(down)<br />

In [16]:
# find the cumulative density function evaluated at the sampled value
actionCFD = dist.cdf(actions)
print(actionCFD)

tensor([[0.4723],
        [0.9226],
        [0.1825]], device='cuda:0', grad_fn=<MulBackward0>)


In [17]:
# convert sampled value to index
actionIdx = []
for agentID in range(N_AGENTS):
    agentActionIdx = []
    for idx, value in enumerate(actions[agentID]):
        cfd = float(actionCFD[agentID][idx])
        if(cfd<0.2):
            agentActionIdx.append(0)
        elif(cfd<0.4):
            agentActionIdx.append(1)
        elif(cfd<0.6):
            agentActionIdx.append(2)
        elif(cfd<0.8):
            agentActionIdx.append(3)
        else:
            agentActionIdx.append(4)
    actionIdx.append(agentActionIdx)
print(actionIdx)

[[2], [4], [0]]


In [18]:
actionIdx = np.array(actionIdx)
print(actionIdx.shape)

(3, 1)


In [19]:
# send action indices to Unity
env.set_actions(behaviorName, actionIdx)

In [20]:
env.close()

In [21]:
def GenerateActionIndex(DecisionAgentNo, actions):
    actionCFD = dist.cdf(actions)    
    actionIdx = []
    for agentID in range(N_AGENTS):
        agentActionIdx = []
        for idx, value in enumerate(actions[agentID]):
            cfd = float(actionCFD[agentID][idx])
            if(cfd<0.2):
                agentActionIdx.append(0)
            elif(cfd<0.4):
                agentActionIdx.append(1)
            elif(cfd<0.6):
                agentActionIdx.append(2)
            elif(cfd<0.8):
                agentActionIdx.append(3)
            else:
                agentActionIdx.append(4)
        actionIdx.append(agentActionIdx)
    return np.array(actionIdx)

# Interact with Unity for multiple steps

In [22]:
env = UnityEnvironment(file_name= None, base_port=5004)

In [23]:
env.reset()
behaviorNames = list(env.behavior_specs.keys())
behaviorName = behaviorNames[0]

In [26]:
env.reset()
for frame in range(500):
    DecisionSteps, TerminalSteps = env.get_steps(behaviorName)    
    if(len(DecisionSteps)==0):
        print("Step", frame, ": no decision steps, reset!")
        env.reset()
    else:  #len(DecisionSteps)>0
        s = torch.FloatTensor(DecisionSteps.obs[0])
        for i in range(1, 4):
            s = torch.cat((s, torch.FloatTensor(DecisionSteps.obs[i])), 1)
        dist = net(s.to(device))
        action = dist.sample()
        actionIdx = GenerateActionIndex(len(DecisionSteps), action)  
        env.set_actions(behaviorName, actionIdx)   
        env.step()

In [27]:
env.close()