Skip to content

Commit

Permalink
Update to match new Airflow version
Browse files Browse the repository at this point in the history
  • Loading branch information
alexmassen-hane committed Aug 7, 2023
1 parent 427fbef commit 44f0b95
Showing 1 changed file with 14 additions and 11 deletions.
25 changes: 14 additions & 11 deletions academic_observatory_workflows/workflows/pubmed_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,11 @@ def __init__(
bq_table_id: str = "pubmed",
bq_dataset_description: str = "Pubmed Medline database, only PubmedArticle records: https://pubmed.ncbi.nlm.nih.gov/about/",
start_date: pendulum.DateTime = pendulum.datetime(year=2022, month=12, day=8),
schedule_interval: str = "@weekly",
schedule: str = "@weekly",
ftp_server_url: str = "ftp.ncbi.nlm.nih.gov",
ftp_port: int = 21,
reset_ftp_counter: int = 40,
max_download_retry: int = 5,
num_merged_records: int = 30000,
queue: str = "remote_queue",
snapshot_expiry_days: int = 31,
max_processes: int = 4, # Limited to 4 due to RAM usage.
Expand All @@ -277,12 +276,11 @@ def __init__(
:param bq_table_id: Table name of the final Pubmed table.
:param bq_dataset_description: Description of the Pubmed dataset.
:param start_date: The start date of the DAG.
:param schedule_interval: tTe schedule interval of the DAG, how often it should run.
:param schedule: How often the DAG should run.
:param ftp_server_url: Server address of Pubmed's FTP server.
:param ftp_port: Port for connectiong to Pubmed's FTP server.
:param reset_ftp_counter: Resets FTP connection after downloading x number of files.
:param max_download_retry: Maximum number of retries of a single Pubmed file from the FTP server before throwing an error.
:param num_merged_records: Number of records to write to file when merging the upsert and delete records, before import into BQ.
:param queue: The queue that the tasks should run on, "default" or "remote_queue".
:param snapshot_expiry_days: How long until the backup snapshot (before this release's upserts and deletes) of the Pubmed table exist in BQ.
:param max_processes: Max number of parallel processes.
Expand All @@ -293,7 +291,7 @@ def __init__(
super().__init__(
dag_id=dag_id,
start_date=start_date,
schedule_interval=schedule_interval,
schedule=schedule,
catchup=False,
airflow_conns=[observatory_api_conn_id],
queue=queue,
Expand Down Expand Up @@ -322,7 +320,6 @@ def __init__(
self.updatefiles_path = "/pubmed/updatefiles/"
self.max_download_retry = max_download_retry
self.reset_ftp_counter = reset_ftp_counter
self.num_merged_records = num_merged_records
self.max_processes = max_processes

# Wait for the previous DAG run to finish to make sure that
Expand All @@ -348,7 +345,8 @@ def __init__(
self.add_task(self.upload_transformed_baseline)
self.add_task(self.bq_load_main_table)

# Snapshot the table before making changes using the updatefiles.
# Create a backup of the main table before applying any changes.
# Only run if it is not the first run of the year.
self.add_task(self.create_snapshot)

### UPDATEFILES ###
Expand All @@ -362,12 +360,12 @@ def __init__(
self.add_task(self.merge_upserts_and_deletes)
self.add_task(self.save_merged_upserts_and_deletes)

# Uplaod and apply upsert records.
# Upload and apply upsert records.
self.add_task(self.upload_merged_upsert_records)
self.add_task(self.bq_load_upsert_table)
self.add_task(self.bq_upsert_records)

# Uplaod amd apply delete records.
# Upload amd apply delete records.
self.add_task(self.upload_merged_delete_records)
self.add_task(self.bq_load_delete_table)
self.add_task(self.bq_delete_records)
Expand All @@ -386,7 +384,7 @@ def list_datafiles_for_release(self, **kwargs) -> bool:
"""
Get a list of all files to process for this release.
Determine if workflow needs to redownload the baseline files again because of a new yearly release
Determine if workflow needs to redownload the baseline files again because of a new yearly release.
"""

dag_run = kwargs["dag_run"]
Expand Down Expand Up @@ -417,14 +415,19 @@ def list_datafiles_for_release(self, **kwargs) -> bool:
year_first_run = True
else:
prev_release_extra = prev_release.extra
last_baseline_upload_date = pendulum.from_timestamp(prev_release_extra["baseline_upload_date"])

logging.info(f"prev_release: {prev_release}")
logging.info(f"pre_release_extra: {prev_release_extra}")
last_baseline_upload_date = pendulum.from_timestamp(prev_release_extra["baseline_upload_date"])
logging.info(f"Last baseline upload date: {last_baseline_upload_date}")

if last_baseline_upload_date == baseline_upload_date:
year_first_run = False
else:
year_first_run = True

logging.info(f"Setting year_first_run to {year_first_run}")

# Grab list of baseline files from FTP server.
files_to_download = []
if year_first_run:
Expand Down

0 comments on commit 44f0b95

Please sign in to comment.