Skip to content

Commit

Permalink
Modified token name to fields_(_SB)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zheng Meyer committed Mar 1, 2020
1 parent 8ba9470 commit 0619556
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 50 deletions.
18 changes: 10 additions & 8 deletions AGLOW/airflow/dags/SKSP_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from AGLOW.airflow.utils.AGLOW_utils import get_field_location_from_srmlist
from AGLOW.airflow.utils.AGLOW_utils import set_field_status_from_task_return
from AGLOW.airflow.utils.AGLOW_utils import modify_parset_from_fields_task
from AGLOW.airflow.utils.AGLOW_utils import check_folder_for_files_from_task
from AGLOW.airflow.utils.AGLOW_utils import check_folder_for_files_from_task
from AGLOW.airflow.utils.AGLOW_utils import get_results_from_subdag


Expand Down Expand Up @@ -73,12 +73,14 @@
dag = DAG('SKSP_Launcher', default_args=default_args, schedule_interval='@once' , catchup=False)

args_dict_juelich = {
"cal1_parset":"/home/apmechev/.conda/envs/AGLOW/GRID_LRT/data/parsets/Pre-Facet-Calibrator-1.parset",
"cal1_parset":"/home/zmz/AGLOW/data/parsets/Pre-Facet-Calibrator-v3.parset",
"cal2_parset":"/home/apmechev/.conda/envs/AGLOW/GRID_LRT/data/parsets/Pre-Facet-Calibrator-2.parset",
"targ1_parset":"/home/apmechev/.conda/envs/AGLOW/GRID_LRT/data/parsets/Pre-Facet-Target-1.parset",
'pref_cal1_cfg':'/home/apmechev/.conda/envs/AGLOW/GRID_LRT/data/config/steps/pref_cal1_juelich.cfg',
"targ1_parset":"/home/zmz/AGLOW/data/parsets/CI/Pre-Facet-Target1-v3.parset",
'pref_cal1_cfg':'/home/zmz/AGLOW/data/config/steps/cal_pref3.json',
'pref_cal2_cfg':'/home/apmechev/.conda/envs/AGLOW/GRID_LRT/data/config/steps/pref_cal2.cfg',
'pref_targ1_cfg':'/home/apmechev/.conda/envs/AGLOW/GRID_LRT/data/config/steps/pref_targ1.cfg'}
'pref_targ1_cfg':'/home/zmz/AGLOW/data/config/steps/targ1_pref3.json',
'files_per_job':999,
'subband_prefix':None }


args_cal = {'attachments':
Expand All @@ -88,7 +90,7 @@
'files_per_job':999,
'token_prefix': datetime.strftime(datetime.now(), "%Y-%m-%d"),
'append_task':None, #We are not adding keys to the tokens, so this is None
'field_prefix': "CI_pref",
'field_prefix': "pref3_",
'srmfile_task': 'stage_cal',
'subband_prefix':None,
'NCPU' : 4
Expand All @@ -100,7 +102,7 @@
'cfg':'/home/zmz/AGLOW/data/config/steps/targ1_pref3.json',
'files_per_job':1,
'token_prefix': datetime.strftime(datetime.now(), "%Y-%m-%d"),
'field_prefix': "CI_pref",
'field_prefix': "pref3_",
'append_task':{'name':'cal_results','parent_dag':True},
'srmfile_task': 'stage_targ',
'subband_prefix':None,
Expand All @@ -112,7 +114,7 @@
'cfg':'/home/zmz/AGLOW/data/config/steps/targ2_pref3.json',
'files_per_job':10,
'token_prefix': datetime.strftime(datetime.now(), "%Y-%m-%d"),
'field_prefix': "CI_pref",
'field_prefix': "pref3_",
'append_task':None,
'srmfile_task': 'targ1_results',
'subband_prefix':'ABN',
Expand Down
29 changes: 22 additions & 7 deletions AGLOW/airflow/operators/LRT_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ def __init__(
append_task = None,
fields_task = None,
pc_database=None,
subband_prefix = "SB",
subband_suffix = "_",
subband_prefix = None,
subband_suffix = None,
token_type = 'test_',
files_per_token = 10,

Expand All @@ -122,8 +122,14 @@ def __init__(
self.pc_database = pc_database
self.tok_config = tok_config
self.fields_task = fields_task
self.subband_prefix = subband_prefix
self.subband_suffix = subband_suffix
if subband_prefix:
self.subband_prefix = subband_prefix
else:
self.subband_prefix = "SB"
if subband_suffix:
self.subband_suffix = subband_suffix
else:
self.subband_suffix = "_"
self.staging_task = staging_task
self.append_task = append_task
self.files_per_token = files_per_token
Expand Down Expand Up @@ -171,15 +177,24 @@ def execute(self, context):
d[i[0]] = i[1]

for token_file in d:
logging.info(token_file)
logging.info("Token file is {}".format(token_file))
with NamedTemporaryFile(delete=False) as savefile:
for line in d[token_file]:
savefile.write("{}\n".format(line).encode('utf-8'))
token_id="{}{}_{}".format(self.t_type,token_file,time.time())
# pref3_$FIELDNAME_$OBSID_$PIPELINE_SB$SBNUMBER
pipeline_step = pipe_type.split('_')[1]
# logging.info("Pipeline step is {}, type pipe_type is {}.".format(pipe_type, type(pipe_type)))
if 'cal' in pipe_type:
token_id="{}_{}_{}".format(self.t_type, srms.obsid, pipeline_step)
elif 'targ' in pipe_type:
token_id="{}_{}_{}_SB{}".format(self.t_type, srms.obsid, pipeline_step, token_file)
else:
token_id="fields_$FIELDNAME_$OBSID_$PIPELINE: {}_{}_{}_{}_{}".format(
self.t_type, token_file, srms.obsid, pipe_type.split('_')[1], time.time())

logging.info(token_id)
self.token_list.append(self.build_token(
token_id,
# token_id="{}{}_{}".format(self.t_type,token_file,time.time()),
attachment={'name':'srm.txt', 'location':savefile.name}))
self.token_list[-1]['STARTSB'] = token_file
os.remove(savefile.name)
Expand Down
46 changes: 11 additions & 35 deletions AGLOW/airflow/subdags/SKSP_juelich.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from AGLOW.airflow.utils.AGLOW_utils import get_field_location_from_srmlist
from AGLOW.airflow.utils.AGLOW_utils import set_field_status_from_task_return
from AGLOW.airflow.utils.AGLOW_utils import modify_parset_from_fields_task
from AGLOW.airflow.utils.AGLOW_utils import check_folder_for_files_from_tokens
from AGLOW.airflow.utils.AGLOW_utils import get_results_from_subdag

from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
Expand All @@ -37,7 +39,7 @@


def juelich_subdag(parent_dag_name, subdagname, dag_args, args_dict=None):
field_name = 'fields_'
field_name = 'pref3_'

dag = DAG(dag_id=parent_dag_name+'.'+subdagname, default_args=dag_args, schedule_interval='@once' , catchup=False)

Expand Down Expand Up @@ -115,44 +117,22 @@ def juelich_subdag(parent_dag_name, subdagname, dag_args, args_dict=None):
tok_config=args_dict['pref_cal1_cfg'],
pc_database = 'sksp2juelich',
fields_task = {'name':'get_next_field','parent_dag':True},
files_per_token=1,
files_per_token=args_dict['files_per_job'],
dag=dag)

#Upload the parset to all the tokens
parset_cal = TokenUploader(task_id='cal_parset',
token_task='token_cal',
parent_dag=True,
upload_file=args_dict['cal1_parset'],
parset_task = 'make_parsets',
pc_database = 'sksp2juelich',
dag=dag)

#sandbox_cal2 = LRTSandboxOperator(task_id='sbx_cal2',
# sbx_config=args_dict['pref_cal2_cfg'],
# dag=dag)

tokens_cal2 = TokenCreator( task_id='token_cal2',
staging_task={'name':'check_calstaged','parent_dag':False},
sbx_task={'name':'sbx_cal2','parent_dag':False},
token_type=field_name,
files_per_token=999,
fields_task = {'name':'get_next_field','parent_dag':True} ,
tok_config=args_dict['pref_cal2_cfg'],
pc_database = 'sksp2juelich',
dag=dag)

parset_cal2 = TokenUploader( task_id='cal_parset2',
token_task='token_cal2',
parent_dag=True,
upload_file=args_dict['cal2_parset'],
parset_task = 'make_parsets',
pc_database = 'sksp2juelich',
cal_results = PythonOperator(task_id='cal_results',
python_callable=get_results_from_subdag,
op_kwargs={'subdag_id':'SKSP_Launcher.launch_juelich', 'task':'token_cal', 'return_key':'CAL2_SOLUTIONS'},
dag=dag)

#sandbox_targ1 = LRTSandboxOperator(task_id='sbx_targ1',
# sbx_config=args_dict['pref_targ1_cfg'],
# trigger_rule='all_done', # The task will start when parents are success or skipped
# dag=dag)

tokens_targ1 = TokenCreator( task_id='token_targ1',
staging_task={'name':'check_targstaged','parent_dag':False},
Expand All @@ -169,7 +149,6 @@ def juelich_subdag(parent_dag_name, subdagname, dag_args, args_dict=None):
token_task='token_targ1',
parent_dag=True,
upload_file=args_dict['targ1_parset'],
parset_task = 'make_parsets',
pc_database = 'sksp2juelich',
dag=dag)

Expand Down Expand Up @@ -208,26 +187,23 @@ def juelich_subdag(parent_dag_name, subdagname, dag_args, args_dict=None):
gsi_path = 'gsiftp://gridftp.grid.sara.nl:2811/pnfs/grid.sara.nl/data/lofar/user/sksp/distrib/SKSP/',
dag=dag)


branch_if_cal_exists >> check_calstaged
branch_if_cal_exists >> calib_done >> tokens_targ1
branch_if_cal_exists >> calib_done

#checking if calibrator is staged
check_calstaged >> branching_cal
branching_cal >> stage >> join
branching_cal >> files_staged >> join

#join >> sandbox_cal
join >> tokens_cal >> parset_cal >> tokens_cal2 >> parset_cal2

#sandbox_cal >> tokens_cal >> parset_cal >> sandbox_cal2
#sandbox_cal2 >> tokens_cal2 >> parset_cal2
join >> tokens_cal >> parset_cal >> calib_done >> cal_results

check_targstaged >> branch_targ_if_staging_needed

branch_targ_if_staging_needed >> files_staged_targ >> join_targ
branch_targ_if_staging_needed >> stage_targ >> join_targ

#join_targ >> sandbox_targ1
join_targ >> tokens_targ1
parset_cal2 >> tokens_targ1 >> parset_targ1 >> check_done_files
calib_done >> cal_results >>tokens_targ1 >> parset_targ1 >> check_done_files
return dag

0 comments on commit 0619556

Please sign in to comment.