diff --git a/academic_observatory_workflows/workflows/orcid_telescope.py b/academic_observatory_workflows/workflows/orcid_telescope.py index 42ab9941..24448e78 100644 --- a/academic_observatory_workflows/workflows/orcid_telescope.py +++ b/academic_observatory_workflows/workflows/orcid_telescope.py @@ -80,19 +80,29 @@ def __init__( run_id: str, cloud_workspace: CloudWorkspace, bq_dataset_id: str, - bq_table_name: str, + bq_main_table_name: str, + bq_upsert_table_name: str, + bq_delete_table_name: str, start_date: pendulum.DateTime, end_date: pendulum.DateTime, prev_end_date: pendulum.DateTime, - modification_cutoff: pendulum.DateTime, + prev_latest_modified_record: pendulum.DateTime, is_first_run: bool, ): """Construct a CrossrefEventsRelease instance - :param dag_id: the id of the DAG. - :param start_date: the start_date of the release. Inclusive. - :param end_date: the end_date of the release. Exclusive. - :param modification_cutoff: the record modification cutoff date of the release. Files modified after this date will be included. + :param dag_id: DAG ID. + :param run_id: DAG run ID. + :param cloud_workspace: Cloud workspace object for this release. + :param bq_dataset_id: BigQuery dataset ID. + :param bq_main_table_name: BigQuery main table name for the ORCID table. + :param bq_upsert_table_name: BigQuery table name for the ORCID upsert table. + :param bq_delete_table_name: BigQuery table name for the ORCID delete table. + :param start_date: Start date for the release. + :param end_date: End date for the release. + :param prev_end_date: End date for the previous release. Used for making the snapshot table date. + :param prev_latest_modified_record: Latest modified record for the previous release. Used to decide which records to update. + :param is_first_run: Whether this is the first run of the DAG. """ super().__init__( dag_id=dag_id, @@ -102,9 +112,11 @@ def __init__( ) self.cloud_workspace = cloud_workspace self.bq_dataset_id = bq_dataset_id - self.bq_table_name = bq_table_name + self.bq_main_table_name = bq_main_table_name + self.bq_upsert_table_name = bq_upsert_table_name + self.bq_delete_table_name = bq_delete_table_name self.prev_end_date = prev_end_date - self.modification_cutoff = modification_cutoff + self.prev_latest_modified_record = prev_latest_modified_record self.is_first_run = is_first_run # Files/folders @@ -117,11 +129,11 @@ def __init__( self.table_uri = gcs_blob_uri( self.cloud_workspace.transform_bucket, f"{gcs_blob_name_from_path(self.transform_folder)}/*.jsonl.gz" ) - self.bq_main_table_id = bq_table_id(cloud_workspace.project_id, bq_dataset_id, bq_table_name) - self.bq_upsert_table_id = bq_table_id(cloud_workspace.project_id, bq_dataset_id, f"{bq_table_name}_upsert") - self.bq_delete_table_id = bq_table_id(cloud_workspace.project_id, bq_dataset_id, f"{bq_table_name}_delete") + self.bq_main_table_id = bq_table_id(cloud_workspace.project_id, bq_dataset_id, bq_main_table_name) + self.bq_upsert_table_id = bq_table_id(cloud_workspace.project_id, bq_dataset_id, bq_upsert_table_name) + self.bq_delete_table_id = bq_table_id(cloud_workspace.project_id, bq_dataset_id, bq_delete_table_name) self.bq_snapshot_table_id = bq_sharded_table_id( - cloud_workspace.project_id, bq_dataset_id, bq_table_name, prev_end_date + cloud_workspace.project_id, bq_dataset_id, bq_main_table_name, prev_end_date ) @property @@ -257,20 +269,28 @@ def make_release(self, **kwargs) -> OrcidRelease: len(releases) == 0 ), "fetch_releases: there should be no DatasetReleases stored in the Observatory API on the first DAG run." - modification_cutoff = pendulum.instance(datetime.datetime.min) + prev_latest_modified_record = pendulum.instance(datetime.datetime.min) + prev_release_end = pendulum.instance(datetime.datetime.min) else: assert ( len(releases) >= 1 ), f"fetch_releases: there should be at least 1 DatasetRelease in the Observatory API after the first DAG run" prev_release = get_latest_dataset_release(releases, "changefile_end_date") - modification_cutoff = prev_release.extra["latest_modified_record_date"] + prev_release_end = prev_release.changefile_end_date + prev_latest_modified_record = prev_release.extra["latest_modified_record_date"] return OrcidRelease( dag_id=self.dag_id, run_id=kwargs["run_id"], + cloud_workspace=self.cloud_workspace, + bq_dataset_id=self.bq_dataset_id, + bq_main_table_name=self.bq_main_table_name, + bq_upsert_table_name=self.bq_upsert_table_name, + bq_delete_table_name=self.bq_delete_table_name, + prev_release_end=prev_release_end, start_date=kwargs["data_interval_start"], end_date=kwargs["data_interval_end"], - modification_cutoff=modification_cutoff, + prev_latest_modified_record=prev_latest_modified_record, is_first_run=is_first_run, ) @@ -332,7 +352,7 @@ def create_manifest(self, release: OrcidRelease, **kwargs): create_manifest_batch, bucket=self.orcid_bucket, dir_name=dir_name, - reference_date=release.end_date, + reference_date=release.prev_latest_modified_record, save_path=os.path.join(release.download_folder, f"{orcid_dir}_manifest.csv"), ) )