### Asynchronous BTgym environment setup.
****
- This example shows base setup for asynchronious execution of multiply BTgym environment instances.


- The main idea is to utilize separate BTgymDataFeedServer process to draw random episode data samples from one main dataset and pass it to every running environment.


- This is quite a memory saver since every env. instance holds only small piece of data (single episode). Besides, sampling and dataset management is performed in it's own process.

In [1]:
import sys
sys.path.insert(0,'..')

import time
import multiprocessing

from btgym import BTgymEnv, BTgymDataset

In [2]:
class Worker(multiprocessing.Process):
    env = None
    
    def __init__(self, env_class, worker_id, num_episodes, env_config):
        super(Worker, self).__init__()
        self.env_class = env_class
        self.num_episodes = num_episodes
        self.env_config = env_config
        self.worker_id = worker_id
        
    def run(self):
        print('worker_{}: making environment...'.format(self.worker_id))
        self.env = self.env_class(**env_config)
        
        for episode in range(self.num_episodes):
            print('worker_{}: episode {} started.'.format(self.worker_id, episode))
            obs = self.env.reset()
            done = False
            while not done:
                # Just repeat `hold` action:
                obs, reward, done, info = self.env.step(0)
            # Get and report statistic:
            stat = self.env.get_stat() 
            print(
                'worker_{}: episode {} finished, {} steps made within {} seconds.'.
                  format(self.worker_id, stat['episode'], stat['length'], stat['runtime'])
            )   
        self.env.close()
        print('worker_{}: environmnt closed.'.format(self.worker_id))
        
        
        
     

In [4]:
# Provide data:
MyDataset = BTgymDataset(
    filename='../examples/data/DAT_ASCII_EURUSD_M1_2016.csv',
    start_weekdays=[0, 1, 2, 3, 4],
    episode_len_days=1, 
    episode_len_hours=23,
    episode_len_minutes=0,
    start_00=False,
    time_gap_hours=2,
)

# Setup:
num_workers = 8   # set it according to available CPU cores.
num_episodes = 2  # individual workplan.
base_port = 5000  # worker environment <--> BTgymServer communictaion, individual for every worker.
data_port = 4999  # BTgymServer <--> BTgymDataFeedServer communication, same for all workers.

workers = []

# Make data-master environment. In this example it will only serves as coordinator
# to start/stop data server, but can be exploited as full-time worker as well;
# just keep it runnung until others exit.
data_master = BTgymEnv(
    dataset=MyDataset,  # It is the only environment here for which dataset is required:
    port=5050,
    data_port=data_port,
    data_master=True,
    connect_timeout=10,  # set server connection timeout to 10 second (default is 60).
    verbose=0,
)

# Make and launch workers in separate processes:
for i in range(num_workers):
    # Worker environment configuration:
    env_config=dict(
        port=base_port + i,
        data_port=data_port,
        data_master=False,  # This option forces environmnet to seek for datafeed provider server
                            # rather to use own dataset. Thus, no dataset is specified for this env.
        connect_timeout=10,
        verbose=0,
    )
    
    # Make:
    worker = Worker(
        env_class=BTgymEnv,
        worker_id=i,
        num_episodes=num_episodes,
        env_config=env_config,
    )
    
    # Launch:
    worker.daemon = False
    worker.start()
    workers.append(worker)
    # Artificial async, no need for this in real training:
    time.sleep(0.1)
    
# Wait everyone to finish:
for worker in workers:
    worker.join()
    print('...worker_{} has joined.'.format(worker.worker_id))
    
# shutdown data server :
data_master.close()

print('data_master: environment closed.')
     

worker_0: making environment...
worker_1: making environment...
worker_2: making environment...
worker_3: making environment...
worker_4: making environment...
worker_5: making environment...
worker_6: making environment...
worker_7: making environment...
worker_0: episode 0 started.
worker_1: episode 0 started.
worker_2: episode 0 started.
worker_3: episode 0 started.
worker_4: episode 0 started.
worker_5: episode 0 started.
worker_6: episode 0 started.
worker_7: episode 0 started.
worker_1: episode 0 finished, 2819 steps made within 0:00:33.205089 seconds.
worker_1: episode 1 started.
worker_0: episode 0 finished, 2819 steps made within 0:00:33.369216 seconds.
worker_0: episode 1 started.
worker_2: episode 0 finished, 2819 steps made within 0:00:33.351503 seconds.
worker_2: episode 1 started.
worker_4: episode 0 finished, 2819 steps made within 0:00:33.374071 seconds.
worker_4: episode 1 started.
worker_5: episode 0 finished, 2819 steps made within 0:00:33.328580 seconds.
worker_5: e

### Under the hood:
*****
...for those who interested in what internal communications are going on: 

- just set **`verbose=2*`**:


*-DEBUG level - do not use that in real life due to data overflood.

In [5]:
MyDataset = BTgymDataset(
    filename='../examples/data/DAT_ASCII_EURUSD_M1_2016.csv',
    start_weekdays=[0, 1, 2, 3, 4],
    episode_len_days=1, 
    episode_len_hours=23,
    episode_len_minutes=0,
    start_00=False,
    time_gap_hours=2,
)

env1 = BTgymEnv(
    dataset=MyDataset,
    port=5050,
    data_port=4999,
    data_master=True,
    connect_timeout=5,
    verbose=2,
)

env2 = BTgymEnv(
    port=5052,
    data_port=4999,
    data_master=False,
    connect_timeout=5,
    verbose=2,
)

_1 = env1.reset()
_2 = env2.reset()

env2.close()
env1.close()

[2017-08-07 20:25:33,896] Custom Dataset class used.
[2017-08-07 20:25:34,041] DataServer PID: 3217
[2017-08-07 20:25:34,839] Loaded 372678 records from <../examples/data/DAT_ASCII_EURUSD_M1_2016.csv>.
[2017-08-07 20:25:34,932] Data summary:
                open           high            low          close    volume
count  372678.000000  372678.000000  372678.000000  372678.000000  372678.0
mean        1.107109       1.107198       1.107019       1.107108       0.0
std         0.024843       0.024840       0.024847       0.024844       0.0
min         1.035250       1.035470       1.035220       1.035220       0.0
25%         1.092140       1.092230       1.092040       1.092140       0.0
50%         1.113530       1.113610       1.113450       1.113530       0.0
75%         1.124710       1.124780       1.124630       1.124710       0.0
max         1.161440       1.161600       1.160770       1.161450       0.0
[2017-08-07 20:25:34,934] Maximum episode time duration set to: 1 day, 23:

[2017-08-07 20:25:36,252] BTgymServer PID: 3227
[2017-08-07 20:25:36,256] BtgymServer: pinging data_server at: tcp://127.0.0.1:4999 ...
[2017-08-07 20:25:36,258] DataServer received <{'ctrl': 'ping!'}>
[2017-08-07 20:25:36,260] DataServer sent: {'ctrl': 'send control keys:  <_get_data>, <_get_info>, <_stop>.'}
[2017-08-07 20:25:36,261] Maximum episode time duration set to: 1 day, 23:00:00.
[2017-08-07 20:25:36,262] Respective number of steps: 2820.
[2017-08-07 20:25:36,263] Maximum allowed data time gap set to: 2:00:00.

[2017-08-07 20:25:36,266] Episode start: 2016-06-01 13:03:00, weekday: 2.
[2017-08-07 20:25:36,269] Episode duration: 1 day, 23:03:00.
[2017-08-07 20:25:36,269] BTgymServer: Data_server seems ready with response: <{'ctrl': 'send control keys:  <_get_data>, <_get_info>, <_stop>.'}>
[2017-08-07 20:25:36,271] Total episode time gap: 0:03:00.
[2017-08-07 20:25:36,273] Sample accepted.
[2017-08-07 20:25:36,274] Episode filename: <_episode_dataset_2016-06-01 13:03:00>.
[2017