# Import and get arguments

In [1]:
from common import *

parser = ArgumentParser(description='Select generated cluster')
parser.add_argument('--scheduler', help='Scheduler server', default='server1')
parser.add_argument('--ini', help='Server config ini file', default='server_config.ini')
parser.add_argument('--result_dir', help='Directory path for saved result', default='result')

_StoreAction(option_strings=['--result_dir'], dest='result_dir', nargs=None, const=None, default='result', type=None, choices=None, help='Directory path for saved result', metavar=None)

## 1. Parse arguments

In [2]:
# args = parser.parse_args()  # .py version
args = parser.parse_args(args=[])  # .ipynb version

## 2. Load server config

In [3]:
configs = ini2dict(args.ini)

## 3. Get client

In [4]:
config_scheduler = configs[args.scheduler]
client = Client(f"{config_scheduler['host']}:{config_scheduler['scheduler_port']}")
print(client)

<Client: 'tcp://172.17.0.2:8786' processes=66 threads=66, memory=1.29 TiB>


## 4. Run tasks

### 4.1 Define task

In [5]:
def task(param):
    id, transfer_info = param

    ## 1. Do something
    sleep(1)
    result = dict(id=id)

    ## 2. Save result with file
    makedirs(transfer_info['result_dir_path'], exist_ok=True)
    src_file_path = abspath(join(transfer_info['result_dir_path'], f'{id}.joblib'))
    dst_file_path = abspath(join(transfer_info['result_dir_path'], f'[{uname()[1]}]{id}.joblib'))
    joblib.dump(result, src_file_path)

    ## 3. Transfer
    config_scheduler = transfer_info['config_scheduler']
    os.system(f"scp -P {config_scheduler['ssh_port']} {src_file_path} {config_scheduler['username']}@{config_scheduler['host']}:{dst_file_path}")
    os.remove(src_file_path)

### 4.2 Set parameters

In [6]:
result_dir_path = abspath(args.result_dir)
if isdir(result_dir_path):  rmtree(result_dir_path)
makedirs(result_dir_path)
ids           = range(10)
transfer_info = dict(result_dir_path=result_dir_path, config_scheduler=config_scheduler)
params        = [(id, transfer_info) for id in ids]

### 4.3 Run tasks

In [7]:
s = time()
futures = client.map(task, params)
for future in futures:
    print(future)

<Future: pending, key: task-f0dd7cceba8c5cea74bf26197307d7c5>
<Future: pending, key: task-fba1af452928db616a5b936f6fd32eac>
<Future: pending, key: task-f59b3e4a307ba6ba81b514376f083828>
<Future: pending, key: task-f72023e59387188d70af43201726622c>
<Future: pending, key: task-a6aeb2153ad91d04dd8c346dbe689d2b>
<Future: pending, key: task-2a7a54e6d7debb7ea706bbd19f0655c8>
<Future: pending, key: task-a9caad3f52d48860611ca0a54256b8d6>
<Future: pending, key: task-e423335896213225c195efb36b117780>
<Future: pending, key: task-ac19ea5fe1ce00b6972080a53cbd208d>
<Future: pending, key: task-3799ff619d1cf1ecbf92056fdc8d9657>


## 5. Print result

In [8]:
list(as_completed(futures))  # wait until all tasks are completed
results = [joblib.load(join(result_dir_path, name)) for name in listdir(result_dir_path)]
print(f"* Elapsed time: {time() - s:.2f}s")
print("Results")
for name, result in zip(listdir(result_dir_path), results):
    print(f"{name}: {result}")

* Elapsed time: 1.36s
Results
[T4_1]2.joblib: {'id': 2}
[2080Ti]8.joblib: {'id': 8}
[T4_1]6.joblib: {'id': 6}
[T4_1]1.joblib: {'id': 1}
[T4_1]0.joblib: {'id': 0}
[T4_1]3.joblib: {'id': 3}
[2080Ti]9.joblib: {'id': 9}
[T4_1]4.joblib: {'id': 4}
[T4_1]5.joblib: {'id': 5}
[2080Ti]7.joblib: {'id': 7}
