# Imports

In [1]:
#export
import os
import sys
from pathlib import Path
p = '/home/sokolov/work/cycler/dHPO/exp/'
sys.path.append(p)
#sys.path.append(os.path.join(os.getcwd(),'exp'))

import time
import json
import numpy as np
from itertools import cycle
from functools import partial
from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


from nb_runner import cycle_exp, cycle_mutate, \
                      cycle_crossover, cycle_combine, cycle_all,\
                      bo_exp, bo_all, bo_crossover
from config import cfg

# Code

### dag base

In [10]:

d

datetime.date(2020, 5, 19)

In [9]:
#export
d = datetime.now().date()
default_args = {
    'owner': cfg.OWNER,
    'depends_on_past': False,
    'start_date': datetime(d.year, d.month, d.day-1),#days_ago(1),#datetime(2020, 5, 15),
    'email': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,# overrided in pythonOperator down below
    'retry_delay': timedelta(minutes=5),# overrided in pythonOperator down below
}

default_pool = cfg.DAG.DEF_POOL
schedule_interval = cfg.DAG.SCHED_INTERVAL if cfg.DAG.SCHED_INTERVAL else None #@daily
description = cfg.DAG.DESC + '\n' + json.dumps(cfg, indent=4)

dag = DAG(  cfg.DAG.NAME,
                default_args=default_args,
                #max_active_runs=1,
                catchup=False,# for disabling backfill!
                description=description,
                schedule_interval=schedule_interval)

def base_task_generator(name, func, dag, pool=default_pool, op_kwargs=None):
    task = PythonOperator(
                    task_id=name,
                    python_callable=func,
                    op_kwargs=op_kwargs,
                    provide_context=True,
                    pool=pool,
                    retries=cfg.DAG.RETRIES,
                    retry_delay=timedelta(minutes=cfg.DAG.RETRY_DELAY),
                    dag=dag,)
    return task
        
create_task = partial(base_task_generator, dag=dag)

def pull_results(**context):
    results = None
    if context is not None:
        task_ids = context['dag'].task_ids
        results = {task_id:context['ti'].xcom_pull(task_ids=task_id) for task_id in task_ids}
    return results


def upstream_results(**context):
    task = context['task']
    upstream_task_ids = task.upstream_task_ids
    rs = [pull_results(**context)[task_id] for task_id in upstream_task_ids]
    if not rs:
        rs = [{'configs':None, 'docker_results':None}]
    for result in rs:
        yield result        

def parse_pool(pool_str):
    pool_str = pool_str.split('_')[-1]
    return ','.join([i for i in pool_str if i.isdigit()])

def chunker_list(seq, size):
    return (seq[i::size] for i in range(size))

### dag extra

In [10]:
#export
def dw_cycle_param(func, **context):
    prev_results = next(upstream_results(**context))
    aux_cfg_files = prev_results['configs']
    gpus = parse_pool(context['ti'].pool)
    return func(aux_cfg_files=aux_cfg_files, gpus=gpus)

def dw_bo_param(func, **context):
    results = pull_results(**context)
    all_hp_points = []
    for task_id, r in results.items():
        if r:
            all_hp_points.append({'points':r['state'], 'target':r['docker_results']['metric']})  
    
    prev_results = next(upstream_results(**context))
    aux_cfg_files = prev_results['configs']
    gpus = parse_pool(context['ti'].pool)
    idx = cfg.GPUS.IDS.index(int(gpus))# wont work with single exp on multiple gpus!
    return func(aux_cfg_files=aux_cfg_files, gpus=gpus, hp_points=all_hp_points, idx=idx)

def dw_pooling(num=1, **context):
    key = 'metric'
    res = {}
    for r in upstream_results(**context):
        res[r['docker_results'][key]] = r['configs']
    res = sorted(res.items(), key=lambda x: x[0], reverse=True)
    res = res[:num]
    best_docker_results = [r[0] for r in res]
    best_configs = [r[1] for r in res]
    print(f'\n\tPooling: best result : {best_docker_results}, {best_configs}\n')
    return [{'configs':best_config} for best_config in best_configs]

def dw_dist(idx=None, **context):
    res = list(upstream_results(**context))[0][idx]
    return res

def distribute(num):
    tasks = []
    for i in range(num):
        task_name = f'distribute_{i}'
        tasks.append(create_task(task_name, dw_dist, op_kwargs={'idx':i}))
    return tasks
        
def block_optimize(n, name, func, dw_param):
    tasks = []
    gpus_avail = cycle(cfg.GPUS.IDS)
        
    for i in range(n):
        gpu = next(gpus_avail)
        task_name = f'{name}_{i}'
        pool = cfg.DAG.POOL_PREFIX + str(gpu)
        
        block_func = partial(func, seq_id=i, name=task_name)
        task = create_task(task_name, dw_param, pool=pool, op_kwargs={'func':block_func}) 
        tasks.append(task) 
    return tasks

### random search

In [49]:
tasks = {'mut':[], 'exp':[], 'cross':[]}

#opt_block(50, 'cycle_all', cycle_all, dw_cycle_param)
tasks['exp'] = block_optimize(3, 'cycle_e', cycle_exp, dw_cycle_param)

# pooling_task1 = create_task(f'pooling1', dw_pooling_one) 
#partial(dw_pooling, num=1)
dist_num = 1
pooling_task1 = create_task(f'pooling_two_best', partial(dw_pooling, num=dist_num))
tasks['exp'] >> pooling_task1

dist_tasks = distribute(dist_num)
pooling_task1 >> dist_tasks

mut_tasks = block_optimize(3, 'cycle_m', cycle_mutate, dw_cycle_param)
for dt, mt in zip(dist_tasks, chunker_list(mut_tasks, dist_num)):
    dt >> mt

# pooling_task2 = create_task(f'pooling2', dw_pooling_one) 
# for task in tasks['mut']:
#     pooling_task1 >> task
#     task >> pooling_task2
    
# tasks['cross'] = block_optimize(3, 'cycle_cr', cycle_crossover)
# pooling_task3 = create_task(f'pooling3', dw_cycle_pool) 
# for task in tasks['cross']:
#     pooling_task2 >> task
#     task >> pooling_task3
    
# tasks['comb'] = block_optimize(3, 'cycle_co', cycle_combine)
# for task in tasks['comb']:
#     pooling_task3 >> task

<Task(PythonOperator): pooling_two_best>

### bo

In [None]:
#export
#tasks = {'mut':[], 'exp':[], 'cross':[]}

tasks_bo_all = block_optimize(150, 'bo_all', bo_all, dw_bo_param)
#tasks['exp'] = cycle_block(3, 'cycle_e', cycle_exp)

pooling_task1 = create_task(f'pooling1', dw_pooling) 
tasks_bo_all >> pooling_task1

#partial(dw_pooling, num=1)
# dist_num = 2
# pooling_task1 = create_task(f'pooling_two_best', partial(dw_pooling, num=dist_num))
# tasks['exp'] >> pooling_task1

# dist_tasks = distribute(dist_num)
# connect(pooling_task1, dist_tasks)

# mut_tasks = block_optimize(7, 'cycle_m', cycle_mutate)
# for dt, mt in zip(dist_tasks, chunker_list(mut_tasks, dist_num)):
#     connect(dt, mt)

# More tests

In [16]:
#export
from airflow.models import TaskInstance
from datetime import datetime
import json
from pprint import pprint

In [17]:

def test1(**context):
    pprint(context)
    return 55

def test2(**context):
    pprint(context)
    pprint(pull_results2(**context))
    return 999


def pull_results2(**context):
    results = None
    if context is not None:
        task = context['task']
        dag = context['dag']
        task_ids = dag.task_ids
        #task_ids = task.upstream_task_ids
        for task_id in task_ids:
            print(task_id)
            xc = context['ti'].xcom_pull(task_ids=task_id)
            print(xc)
            
        results = {task_id:context['ti'].xcom_pull(task_ids=task_id) for task_id in task_ids}
    return results

task1 = base_task_generator('test1', test1, dag)
task2 = base_task_generator('test2', test2, dag)
#task1 >> task2


In [18]:
ti = TaskInstance(task=task2, execution_date=datetime.now())
ctxt = ti.get_template_context()
task2.execute(context=ctxt)

{'END_DATE': '2020-05-18',
 'conf': <airflow.configuration.AirflowConfigParser object at 0x7f37e501dbe0>,
 'dag': <DAG: crsch_hyp_search1>,
 'dag_run': None,
 'ds': '2020-05-18',
 'ds_nodash': '20200518',
 'end_date': '2020-05-18',
 'execution_date': <Pendulum [2020-05-18T12:45:55.877627+00:00]>,
 'inlets': [],
 'latest_date': '2020-05-18',
 'macros': <module 'airflow.macros' from '/home/sokolov/.local/lib/python3.6/site-packages/airflow/macros/__init__.py'>,
 'next_ds': '2020-05-19',
 'next_ds_nodash': '20200519',
 'next_execution_date': <Pendulum [2020-05-19T00:00:00+00:00]>,
 'outlets': [],
 'params': {},
 'prev_ds': '2020-05-18',
 'prev_ds_nodash': '20200518',
 'prev_execution_date': <Pendulum [2020-05-18T00:00:00+00:00]>,
 'prev_execution_date_success': <Proxy at 0x7f38290a5a08 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7f38290ef840>>,
 'prev_start_date_success': <Proxy at 0x7f38290a5a48 with factory <function TaskInstance.get_template_context.

999

In [68]:
d.task_ids

['test1', 'test2', 'cycle_e_0', 'cycle_e_1', 'cycle_e_2', 'pooling_two_best']

In [23]:
d = ctxt['dag']
t = d.task_dict['test2']
t.xcom_pull(ctxt)

In [37]:
task2.xcom_push(ctxt, 'test',13)

In [41]:
task1.xcom_pull(ctxt)

In [17]:
ti.pool, ti.current_state()

('sokolov_pool_gpu0', None)

In [14]:
ti = TaskInstance(task=task1, execution_date=datetime.now())
task1.execute(context=ti.get_template_context())
ti.pool, ti.current_state()

{'END_DATE': '2020-05-18',
 'conf': <airflow.configuration.AirflowConfigParser object at 0x7f7d3675cd68>,
 'dag': <DAG: crsch_hyp_search1>,
 'dag_run': None,
 'ds': '2020-05-18',
 'ds_nodash': '20200518',
 'end_date': '2020-05-18',
 'execution_date': <Pendulum [2020-05-18T09:26:52.863269+00:00]>,
 'inlets': [],
 'latest_date': '2020-05-18',
 'macros': <module 'airflow.macros' from '/home/sokolov/.local/lib/python3.6/site-packages/airflow/macros/__init__.py'>,
 'next_ds': '2020-05-19',
 'next_ds_nodash': '20200519',
 'next_execution_date': <Pendulum [2020-05-19T00:00:00+00:00]>,
 'outlets': [],
 'params': {},
 'prev_ds': '2020-05-18',
 'prev_ds_nodash': '20200518',
 'prev_execution_date': <Pendulum [2020-05-18T00:00:00+00:00]>,
 'prev_execution_date_success': <Proxy at 0x7f7d106ed688 wrapping datetime.datetime(2020, 5, 18, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>) at 0x7f7d107b4ed0 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7f7d2c38b0d0

('sokolov_pool_gpu0', None)

In [13]:
ti.pool, ti.current_state()

('sokolov_pool_gpu0', None)

In [None]:
def task_state(args):
    dag = get_dag(args)
    task = dag.get_task(task_id=args.task_id)
    ti = TaskInstance(task, args.execution_date)
    print(ti.current_state())

# Export

In [24]:
!python3 extra/n2s.py dag.ipynb

Converted dag.ipynb to exp/nb_dag.py


In [25]:
!airflow list_dags

[2020-05-19 22:55:41,893] {__init__.py:51} INFO - Using executor LocalExecutor
[2020-05-19 22:55:41,893] {dagbag.py:396} INFO - Filling up the DagBag from /home/sokolov/airflow/dags
[2020-05-19 22:55:41,894] {dagbag.py:396} INFO - Filling up the DagBag from /home/sokolov/work/cycler/dHPO/exp


-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
dhpo2

