From 44f0b9593a52ed62c679a566216c2d1e521a0414 Mon Sep 17 00:00:00 2001 From: Alex Massen-Hane Date: Mon, 7 Aug 2023 14:16:06 +0800 Subject: [PATCH] Update to match new Airflow version --- .../workflows/pubmed_telescope.py | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/academic_observatory_workflows/workflows/pubmed_telescope.py b/academic_observatory_workflows/workflows/pubmed_telescope.py index 11768f805..17fa1bbc3 100644 --- a/academic_observatory_workflows/workflows/pubmed_telescope.py +++ b/academic_observatory_workflows/workflows/pubmed_telescope.py @@ -258,12 +258,11 @@ def __init__( bq_table_id: str = "pubmed", bq_dataset_description: str = "Pubmed Medline database, only PubmedArticle records: https://pubmed.ncbi.nlm.nih.gov/about/", start_date: pendulum.DateTime = pendulum.datetime(year=2022, month=12, day=8), - schedule_interval: str = "@weekly", + schedule: str = "@weekly", ftp_server_url: str = "ftp.ncbi.nlm.nih.gov", ftp_port: int = 21, reset_ftp_counter: int = 40, max_download_retry: int = 5, - num_merged_records: int = 30000, queue: str = "remote_queue", snapshot_expiry_days: int = 31, max_processes: int = 4, # Limited to 4 due to RAM usage. @@ -277,12 +276,11 @@ def __init__( :param bq_table_id: Table name of the final Pubmed table. :param bq_dataset_description: Description of the Pubmed dataset. :param start_date: The start date of the DAG. - :param schedule_interval: tTe schedule interval of the DAG, how often it should run. + :param schedule: How often the DAG should run. :param ftp_server_url: Server address of Pubmed's FTP server. :param ftp_port: Port for connectiong to Pubmed's FTP server. :param reset_ftp_counter: Resets FTP connection after downloading x number of files. :param max_download_retry: Maximum number of retries of a single Pubmed file from the FTP server before throwing an error. - :param num_merged_records: Number of records to write to file when merging the upsert and delete records, before import into BQ. :param queue: The queue that the tasks should run on, "default" or "remote_queue". :param snapshot_expiry_days: How long until the backup snapshot (before this release's upserts and deletes) of the Pubmed table exist in BQ. :param max_processes: Max number of parallel processes. @@ -293,7 +291,7 @@ def __init__( super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=False, airflow_conns=[observatory_api_conn_id], queue=queue, @@ -322,7 +320,6 @@ def __init__( self.updatefiles_path = "/pubmed/updatefiles/" self.max_download_retry = max_download_retry self.reset_ftp_counter = reset_ftp_counter - self.num_merged_records = num_merged_records self.max_processes = max_processes # Wait for the previous DAG run to finish to make sure that @@ -348,7 +345,8 @@ def __init__( self.add_task(self.upload_transformed_baseline) self.add_task(self.bq_load_main_table) - # Snapshot the table before making changes using the updatefiles. + # Create a backup of the main table before applying any changes. + # Only run if it is not the first run of the year. self.add_task(self.create_snapshot) ### UPDATEFILES ### @@ -362,12 +360,12 @@ def __init__( self.add_task(self.merge_upserts_and_deletes) self.add_task(self.save_merged_upserts_and_deletes) - # Uplaod and apply upsert records. + # Upload and apply upsert records. self.add_task(self.upload_merged_upsert_records) self.add_task(self.bq_load_upsert_table) self.add_task(self.bq_upsert_records) - # Uplaod amd apply delete records. + # Upload amd apply delete records. self.add_task(self.upload_merged_delete_records) self.add_task(self.bq_load_delete_table) self.add_task(self.bq_delete_records) @@ -386,7 +384,7 @@ def list_datafiles_for_release(self, **kwargs) -> bool: """ Get a list of all files to process for this release. - Determine if workflow needs to redownload the baseline files again because of a new yearly release + Determine if workflow needs to redownload the baseline files again because of a new yearly release. """ dag_run = kwargs["dag_run"] @@ -417,14 +415,19 @@ def list_datafiles_for_release(self, **kwargs) -> bool: year_first_run = True else: prev_release_extra = prev_release.extra + last_baseline_upload_date = pendulum.from_timestamp(prev_release_extra["baseline_upload_date"]) + logging.info(f"prev_release: {prev_release}") logging.info(f"pre_release_extra: {prev_release_extra}") - last_baseline_upload_date = pendulum.from_timestamp(prev_release_extra["baseline_upload_date"]) + logging.info(f"Last baseline upload date: {last_baseline_upload_date}") + if last_baseline_upload_date == baseline_upload_date: year_first_run = False else: year_first_run = True + logging.info(f"Setting year_first_run to {year_first_run}") + # Grab list of baseline files from FTP server. files_to_download = [] if year_first_run: