# Test Service

Intended to test the service.py evaluator.
Runs the service.py and a simple client.



# Setup

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:90% !important; }</style>"))

  from IPython.core.display import display, HTML


In [3]:
import sys
import os
import redis
import subprocess as sp
import shlex
import time
import importlib_resources as ir
import socket
from contextlib import closing
import uuid
import pickle

In [4]:
import pandas as pd
import numpy as np

In [5]:
from flatland.core.env_observation_builder import DummyObservationBuilder
from flatland.envs.persistence import RailEnvPersister
from flatland.evaluators.client import FlatlandRemoteClient
from flatland.evaluators.client import TimeoutException
from flatland.envs.rail_env import RailEnvActions

In [6]:
  def check_socket(host, port):
    with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
        assert sock.connect_ex((host, port)) == 0, f"Port {host} {port} is not open"
        
        print(f"Port {host} {port} is open")
        
check_socket("127.0.0.1", 6379)

Port 127.0.0.1 6379 is open


### Find the real path of the `env_data` package (should be copied by tox)

In [7]:
with ir.path("env_data.tests", "test_001.pkl") as oPath:
    sPath = oPath
print(type(sPath), sPath)

<class 'pathlib.PosixPath'> /Users/che/workspaces/flatland-rl/env_data/tests/test_001.pkl


In [8]:
sDirRoot = "/" + "/".join(sPath.parts[1:-1] + ("service_test",""))
sDirRoot

'/Users/che/workspaces/flatland-rl/env_data/tests/service_test/'

### Clear any old redis keys

In [9]:
oRedis = redis.Redis()

In [10]:
lKeys = oRedis.keys("flatland*")
lKeys

[b'flatland-rl::7862565f-2cab-43a2-8bee-a444dc38b874::response::9233d209716f4ae78a5dbe124de67e27']

In [11]:
for sKey in lKeys:
    print("Deleting:", sKey)
    oRedis.delete(sKey)

Deleting: b'flatland-rl::7862565f-2cab-43a2-8bee-a444dc38b874::response::9233d209716f4ae78a5dbe124de67e27'


### Remove `/tmp/output.csv`

In [12]:
!rm -f /tmp/output.csv

### kill any old `service.py` process

In [13]:
!ps -ef | grep -i python | grep -i flatland.evaluators.service | awk '{print $2}' | xargs kill

In [14]:
osEnv2 = os.environ.copy()

### Set some short timeouts for testing

In [15]:
osEnv2["FLATLAND_OVERALL_TIMEOUT"]="10"
osEnv2["FLATLAND_PER_STEP_TIMEOUT"] = "2"
osEnv2["FLATLAND_MAX_SUCCESSIVE_TIMEOUTS"] = "2"
 
osEnv2["TEST_ID_FILTER"] = "Test_1"

### Create the python command for `service.py`

In [16]:
FLATLAND_RL_SERVICE_ID = uuid.uuid4()
#sCmd = "python -m flatland.evaluators.service --test_folder ../env_data/tests/service_test --mergeDir ./tmp/merge --actionDir ./tmp/actions --pickle --missingOnly --service_id {FLATLAND_RL_SERVICE_ID}"
#sCmd = "python -m flatland.evaluators.service --test_folder ../env_data/tests/service_test --pickle --service_id {FLATLAND_RL_SERVICE_ID}" # --verbose"
sCmd = f"python -m flatland.evaluators.service --test_folder {sDirRoot} --pickle --service_id {FLATLAND_RL_SERVICE_ID}" # --verbose"
lsCmd = shlex.split(sCmd)
print(sCmd)
print(lsCmd)

python -m flatland.evaluators.service --test_folder /Users/che/workspaces/flatland-rl/env_data/tests/service_test/ --pickle --service_id 3e480a6f-33d8-44ae-a9cd-78ea3d11781a
['python', '-m', 'flatland.evaluators.service', '--test_folder', '/Users/che/workspaces/flatland-rl/env_data/tests/service_test/', '--pickle', '--service_id', '3e480a6f-33d8-44ae-a9cd-78ea3d11781a']


### Run the command with Popen (output goes to jupyter stdout not notebook)

In [17]:
with open("/tmp/stdout.txt","w") as out, open("/tmp/stderr.txt","w") as err:
    oPipe = sp.Popen(lsCmd, env=osEnv2, stdout=out,stderr=err)

In [18]:
oPipe.poll()

In [19]:
oFRC = FlatlandRemoteClient(test_env_folder=sDirRoot, verbose=False, use_pickle=True, flatland_rl_service_id=FLATLAND_RL_SERVICE_ID)

In [20]:
env, env_dict = RailEnvPersister.load_new(f"{sDirRoot}/Test_0/Level_0.pkl")

In [21]:
def forward_only_controller(obs, _env):
    dAct = {}
    for iAg in range(len(_env.agents)):
        dAct[iAg] = RailEnvActions.MOVE_FORWARD
    return dAct

def random_controller(obs, _env):
    dAct = {}
    for iAg in range(len(_env.agents)):
        dAct[iAg] = np.random.randint(0, 5)
    return dAct

In [22]:
oObsB = DummyObservationBuilder()

In [23]:
oObsB.get()

True

In [24]:
def run_submission(slow_ep=1, delay=2, collect=False, verify=False):
    episode = 0
    obs = True
    while obs:
        print("==============")
        print(f"Episode : {episode} (1)")
        print("==============")
        obs, info = oFRC.env_create(obs_builder_object=oObsB)
        oFRC.env.record_steps = True
        if not obs:
            print("null observation - all envs completed!")
            break
        print("==============")
        print(f"Episode : {episode} (2)")
        print("==============")
        

        print(oFRC.env.dones['__all__'])

        if verify:
            with open(f"episode_{episode}.pkl", "rb") as file_in:
                expected = pickle.loads(file_in.read())
        for step in range(oFRC.env._max_episode_steps):
            if episode < 3:
                action = forward_only_controller(obs, oFRC.env)
            else:
                action = random_controller(obs, oFRC.env)
            
            time_start = time.time()
            
            if (episode == slow_ep) and (oFRC.env._elapsed_steps > 10):
                time.sleep(2)
                
            observation, all_rewards, done, info = oFRC.env_step(action)
            time_diff = time.time() - time_start
            if verify:
                assert expected[step] == oFRC.env.cur_episode[step]
            if collect:
                with open(f"episode_{episode}.pkl", "wb") as file_out:
                    data = pickle.dumps(oFRC.env.cur_episode)
                    file_out.write(data)
            if done['__all__']:
                if verify:
                    assert len(expected[step]) == len(oFRC.env.cur_episode[step])
                print("\nCompleted Episode : ", episode)
                print("Reward : ", sum(list(all_rewards.values())))
                break
            
        episode += 1
        
    print(f"Evaluation Complete - episodes={episode} - send submit message...")
    print(oFRC.submit())
    print("All done.")

In [25]:
try:
    run_submission()
except Exception as timeoutException:
    print("Timed out.")
    print(timeoutException)
    try:
        # give evaluator enough time before submitting!
        time.sleep(2)
        print(f"Evaluation timed out - send submit message...")
        print(oFRC.submit())
        
    except Exception:
        print("All done.")

Episode : 0 (1)
Episode : 0 (2)
False
get_travel_time_on_shortest_path=147
0 -193
1 0
2 0
get_travel_time_on_shortest_path=46
3 -168
get_travel_time_on_shortest_path=80
4 -103
get_travel_time_on_shortest_path=72
5 -108
6 0

Completed Episode :  0
Reward :  -572
Episode : 1 (1)
Episode : 1 (2)
False
Error received:  {'type': 'FLATLAND_RL.ENV_STEP_TIMEOUT'}
Timed out.
FLATLAND_RL.ENV_STEP_TIMEOUT
Evaluation timed out - send submit message...
Error received:  {'type': 'FLATLAND_RL.ENV_STEP_TIMEOUT'}
All done.


In [26]:
!cat /tmp/stdout.txt

Max pre-planning time: 600
Max step time: 2
Max overall time: 10
Max submission startup time: 300
Max consecutive timeouts: 2
{'Test_1'}
['Test_1/Level_0.pkl', 'Test_1/Level_1.pkl']
['Test_1/Level_0.pkl', 'Test_1/Level_1.pkl']
Listening at :  flatland-rl::3e480a6f-33d8-44ae-a9cd-78ea3d11781a::commands
 -- [DEBUG] [env_create] EVAL DONE:  False
 -- [DEBUG] [env_create] SIM COUNT:  1 2
Evaluating Test_1/Level_0.pkl (1/2)
get_travel_time_on_shortest_path=147
0 -193
1 0
2 0
get_travel_time_on_shortest_path=46
3 -168
get_travel_time_on_shortest_path=80
4 -103
get_travel_time_on_shortest_path=72
5 -108
6 0
Percentage for test 1, level 0: 0.42857142857142855
Evaluation finished in 267 timesteps, 0.205 seconds. Percentage agents done: 0.429. Normalized reward: 0.694. Number of malfunctions: 0.
Total normalized reward so far: 0.694
Wrote intermediate output results to : /tmp/output.csv
 -- [DEBUG] [env_create] EVAL DONE:  False
 -- [DEBUG] [env_create] SIM COUNT:  2 2
Evaluating Test_1/Level_1.

In [27]:
!cat /tmp/stderr.txt

### Kill the evaluator process we started earlier

In [28]:
!ps -ef | grep -i python | grep -i flatland.evaluators.service | awk '{print $2}' | xargs kill

In [29]:
df = pd.read_csv("/tmp/output.csv").T
df

Unnamed: 0,0,1,2,3
filename,Test_0/Level_0.pkl,Test_0/Level_1.pkl,Test_1/Level_0.pkl,Test_1/Level_1.pkl
test_id,Test_0,Test_0,Test_1,Test_1
env_id,Level_0,Level_1,Level_0,Level_1
n_agents,7,7,7,7
x_dim,30,30,30,30
y_dim,30,30,30,30
n_cities,2,2,2,2
max_rail_pairs_in_city,2,2,2,2
n_envs_run,10,10,10,10
seed,335971,335972,335971,335972


In [30]:
df.transpose()["reward"].tolist()

[nan, nan, -572.0, 1484.0]

In [31]:
assert np.array_equal(df.transpose()["reward"].tolist(), [np.nan, np.nan, -572.0, 1484.0], equal_nan=True)