1616network_config = json .loads (base64 .b64decode (os .getenv ('ECS_NETWORK_CONFIG' )))
1717cluster = os .getenv ('ECS_CLUSTER' )
1818es_url = os .getenv ('ES_URL' )
19+ air_env = os .getenv ('AIRFLOW_ENVIRONMENT' )
1920
2021
2122def set_s3 ():
2223 today = datetime .utcnow ().strftime ('%Y-%m-%d-%H-%M-%S' )
23- s3_key = "s3://aspace-oai-s3-stage/{file }.xml" . format ( file = today )
24+ s3_key = f "s3://aspace-oai-s3-{ air_env } / { today } .xml"
2425 logging .info (s3_key )
2526 return s3_key
2627
@@ -50,13 +51,12 @@ def check_if_records(**context):
5051harvest = ECSOperator (task_id = 'harvest_step_1' ,
5152 dag = dag ,
5253 cluster = cluster ,
53- task_definition = ' airflow-stage -oaiharvester' ,
54+ task_definition = f" airflow-{ air_env } -oaiharvester" ,
5455 overrides = {'containerOverrides' : [{
5556 'command' : ["--out={{ task_instance.xcom_pull(task_ids='set_s3') }}" ,
5657 '--host=https://archivesspace.mit.edu/oai' ,
57- '--format=oai_ead' ,
58- '--verbose' , ],
59- 'name' : 'airflow-stage-oaiharvester' ,
58+ '--format=oai_ead' , ],
59+ 'name' : f"airflow-{ air_env } -oaiharvester" ,
6060 }]},
6161 network_configuration = network_config )
6262
@@ -79,16 +79,17 @@ def check_if_records(**context):
7979ingest = ECSOperator (task_id = 'harvest_step_3' ,
8080 dag = dag ,
8181 cluster = cluster ,
82- task_definition = ' airflow-stage -mario' ,
82+ task_definition = f" airflow-{ air_env } -mario" ,
8383 overrides = {'containerOverrides' : [{
84- 'command' : ['--url=' + es_url ,
84+ 'command' : [f"--url={ es_url } " ,
85+ '--index=aspace_2019_12' ,
8586 'ingest' ,
8687 '--prefix=aspace' ,
8788 '--type=archives' ,
8889 '--auto' ,
8990 '--debug' ,
9091 "{{ task_instance.xcom_pull(task_ids='set_s3') }}" , ],
91- 'name' : ' airflow-stage -mario' ,
92+ 'name' : f" airflow-{ air_env } -mario" ,
9293 }]},
9394 network_configuration = network_config )
9495
0 commit comments