Skip to content

Commit

Permalink
release and date fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
keegansmith21 committed Jul 3, 2023
1 parent ae686c7 commit d5b448d
Showing 1 changed file with 36 additions and 16 deletions.
52 changes: 36 additions & 16 deletions academic_observatory_workflows/workflows/orcid_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,29 @@ def __init__(
run_id: str,
cloud_workspace: CloudWorkspace,
bq_dataset_id: str,
bq_table_name: str,
bq_main_table_name: str,
bq_upsert_table_name: str,
bq_delete_table_name: str,
start_date: pendulum.DateTime,
end_date: pendulum.DateTime,
prev_end_date: pendulum.DateTime,
modification_cutoff: pendulum.DateTime,
prev_latest_modified_record: pendulum.DateTime,
is_first_run: bool,
):
"""Construct a CrossrefEventsRelease instance
:param dag_id: the id of the DAG.
:param start_date: the start_date of the release. Inclusive.
:param end_date: the end_date of the release. Exclusive.
:param modification_cutoff: the record modification cutoff date of the release. Files modified after this date will be included.
:param dag_id: DAG ID.
:param run_id: DAG run ID.
:param cloud_workspace: Cloud workspace object for this release.
:param bq_dataset_id: BigQuery dataset ID.
:param bq_main_table_name: BigQuery main table name for the ORCID table.
:param bq_upsert_table_name: BigQuery table name for the ORCID upsert table.
:param bq_delete_table_name: BigQuery table name for the ORCID delete table.
:param start_date: Start date for the release.
:param end_date: End date for the release.
:param prev_end_date: End date for the previous release. Used for making the snapshot table date.
:param prev_latest_modified_record: Latest modified record for the previous release. Used to decide which records to update.
:param is_first_run: Whether this is the first run of the DAG.
"""
super().__init__(
dag_id=dag_id,
Expand All @@ -102,9 +112,11 @@ def __init__(
)
self.cloud_workspace = cloud_workspace
self.bq_dataset_id = bq_dataset_id
self.bq_table_name = bq_table_name
self.bq_main_table_name = bq_main_table_name
self.bq_upsert_table_name = bq_upsert_table_name
self.bq_delete_table_name = bq_delete_table_name
self.prev_end_date = prev_end_date
self.modification_cutoff = modification_cutoff
self.prev_latest_modified_record = prev_latest_modified_record
self.is_first_run = is_first_run

# Files/folders
Expand All @@ -117,11 +129,11 @@ def __init__(
self.table_uri = gcs_blob_uri(
self.cloud_workspace.transform_bucket, f"{gcs_blob_name_from_path(self.transform_folder)}/*.jsonl.gz"
)
self.bq_main_table_id = bq_table_id(cloud_workspace.project_id, bq_dataset_id, bq_table_name)
self.bq_upsert_table_id = bq_table_id(cloud_workspace.project_id, bq_dataset_id, f"{bq_table_name}_upsert")
self.bq_delete_table_id = bq_table_id(cloud_workspace.project_id, bq_dataset_id, f"{bq_table_name}_delete")
self.bq_main_table_id = bq_table_id(cloud_workspace.project_id, bq_dataset_id, bq_main_table_name)
self.bq_upsert_table_id = bq_table_id(cloud_workspace.project_id, bq_dataset_id, bq_upsert_table_name)
self.bq_delete_table_id = bq_table_id(cloud_workspace.project_id, bq_dataset_id, bq_delete_table_name)
self.bq_snapshot_table_id = bq_sharded_table_id(
cloud_workspace.project_id, bq_dataset_id, bq_table_name, prev_end_date
cloud_workspace.project_id, bq_dataset_id, bq_main_table_name, prev_end_date
)

@property
Expand Down Expand Up @@ -257,20 +269,28 @@ def make_release(self, **kwargs) -> OrcidRelease:
len(releases) == 0
), "fetch_releases: there should be no DatasetReleases stored in the Observatory API on the first DAG run."

modification_cutoff = pendulum.instance(datetime.datetime.min)
prev_latest_modified_record = pendulum.instance(datetime.datetime.min)
prev_release_end = pendulum.instance(datetime.datetime.min)
else:
assert (
len(releases) >= 1
), f"fetch_releases: there should be at least 1 DatasetRelease in the Observatory API after the first DAG run"
prev_release = get_latest_dataset_release(releases, "changefile_end_date")
modification_cutoff = prev_release.extra["latest_modified_record_date"]
prev_release_end = prev_release.changefile_end_date
prev_latest_modified_record = prev_release.extra["latest_modified_record_date"]

return OrcidRelease(
dag_id=self.dag_id,
run_id=kwargs["run_id"],
cloud_workspace=self.cloud_workspace,
bq_dataset_id=self.bq_dataset_id,
bq_main_table_name=self.bq_main_table_name,
bq_upsert_table_name=self.bq_upsert_table_name,
bq_delete_table_name=self.bq_delete_table_name,
prev_release_end=prev_release_end,
start_date=kwargs["data_interval_start"],
end_date=kwargs["data_interval_end"],
modification_cutoff=modification_cutoff,
prev_latest_modified_record=prev_latest_modified_record,
is_first_run=is_first_run,
)

Expand Down Expand Up @@ -332,7 +352,7 @@ def create_manifest(self, release: OrcidRelease, **kwargs):
create_manifest_batch,
bucket=self.orcid_bucket,
dir_name=dir_name,
reference_date=release.end_date,
reference_date=release.prev_latest_modified_record,
save_path=os.path.join(release.download_folder, f"{orcid_dir}_manifest.csv"),
)
)
Expand Down

0 comments on commit d5b448d

Please sign in to comment.