diff --git a/academic_observatory_workflows/database/schema/openalex/works.json b/academic_observatory_workflows/database/schema/openalex/works.json index e1ed5067..39823ec0 100644 --- a/academic_observatory_workflows/database/schema/openalex/works.json +++ b/academic_observatory_workflows/database/schema/openalex/works.json @@ -209,19 +209,19 @@ }, { "name": "landing_page_url", - "type": "BOOLEAN", + "type": "STRING", "mode": "NULLABLE", "description": "The landing page URL for this location." }, { "name": "license", - "type": "BOOLEAN", + "type": "STRING", "mode": "NULLABLE", "description": "The location's publishing license. This can be a Create Commons license such as cc0 or cc-by, a publisher-specific license, or null which means we are not able to determine a license for this location." }, { "name": "source", - "type": "BOOLEAN", + "type": "RECORD", "mode": "NULLABLE", "fields": [ { @@ -283,13 +283,13 @@ }, { "name": "pdf_url", - "type": "BOOLEAN", + "type": "STRING", "mode": "NULLABLE", "description": "A URL where you can find this location as a PDF." }, { "name": "version", - "type": "BOOLEAN", + "type": "STRING", "mode": "NULLABLE", "description": "The version of the work, based on the DRIVER Guidelines versioning scheme." } diff --git a/academic_observatory_workflows/workflows/openalex_telescope.py b/academic_observatory_workflows/workflows/openalex_telescope.py index 80835ce0..a9004e81 100644 --- a/academic_observatory_workflows/workflows/openalex_telescope.py +++ b/academic_observatory_workflows/workflows/openalex_telescope.py @@ -160,7 +160,7 @@ def current_entries(self): @property def has_merged_ids(self): - return len(self.current_merged_ids) >= 0 + return len(self.current_merged_ids) >= 1 @property def current_merged_ids(self): @@ -317,7 +317,7 @@ def __init__( PreviousDagRunSensor( dag_id=self.dag_id, external_task_id=external_task_id, - execution_delta=timedelta(days=1), # To match the @daily schedule_interval + execution_delta=timedelta(days=7), # To match the @weekly schedule_interval ) ) self.add_setup_task(self.check_dependencies) @@ -664,6 +664,7 @@ def bq_load_delete_tables(self, release: OpenAlexRelease, **kwargs): """Load the delete tables.""" for entity in release.entities: + current_merged_ids = entity.current_merged_ids if entity.has_merged_ids: logging.info( f"bq_load_delete_tables: loading {entity.entity_name} delete table {entity.bq_delete_table_id}" diff --git a/academic_observatory_workflows/workflows/tests/test_openalex_telescope.py b/academic_observatory_workflows/workflows/tests/test_openalex_telescope.py index 71088864..19d3505d 100644 --- a/academic_observatory_workflows/workflows/tests/test_openalex_telescope.py +++ b/academic_observatory_workflows/workflows/tests/test_openalex_telescope.py @@ -60,9 +60,7 @@ def __init__(self, *args, **kwargs): super(TestOpenAlexUtils, self).__init__(*args, **kwargs) self.dag_id = "openalex" self.aws_key = (os.getenv("AWS_ACCESS_KEY_ID"), os.getenv("AWS_SECRET_ACCESS_KEY")) - self.aws_region_name = "us-east-1" - # TODO: check that not region set in workflow works with openalex bucket, or if this must be specified - os.environ["AWS_DEFAULT_REGION"] = self.aws_region_name + self.aws_region_name = os.getenv("AWS_DEFAULT_REGION") def test_s3_uri_parts(self): # Bucket and object @@ -542,9 +540,7 @@ def __init__(self, *args, **kwargs): self.data_location = os.getenv("TEST_GCP_DATA_LOCATION") self.aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID") self.aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY") - self.aws_region_name = "us-east-1" - # TODO: check that not region set in workflow works with openalex bucket, or if this must be specified - os.environ["AWS_DEFAULT_REGION"] = self.aws_region_name + self.aws_region_name = os.getenv("AWS_DEFAULT_REGION") def test_dag_structure(self): """Test that the DAG has the correct structure.""" @@ -560,8 +556,10 @@ def test_dag_structure(self): "fetch_releases": ["create_datasets"], "create_datasets": ["bq_create_main_table_snapshots"], "bq_create_main_table_snapshots": ["aws_to_gcs_transfer"], - "aws_to_gcs_transfer": ["download"], - "download": ["transform"], + "aws_to_gcs_transfer": ["download_concepts"], + "download_concepts": ["download_institutions"], + "download_institutions": ["download_works"], + "download_works": ["transform"], "transform": ["upload_upsert_files"], "upload_upsert_files": ["bq_load_upsert_tables"], "bq_load_upsert_tables": ["bq_upsert_records"], @@ -615,35 +613,34 @@ def test_telescope(self): env.add_connection(conn) dag = workflow.make_dag() - # First run: snapshot - data_interval_start = pendulum.datetime(2023, 4, 2) - with env.create_dag_run(dag, data_interval_start) as dag_run: - # Mocked and expected data - release = OpenAlexRelease( - dag_id=self.dag_id, - run_id=dag_run.run_id, - entities=[], - download_bucket=workflow.cloud_workspace.download_bucket, - changefile_start_date=data_interval_start, - changefile_end_date=data_interval_start, - is_first_run=True, + # Create bucket and dataset for use in first and second run + with aws_bucket_test_env(prefix=self.dag_id, region_name=self.aws_region_name) as bucket_name: + workflow.aws_openalex_bucket = bucket_name + create_openalex_dataset( + pathlib.Path(test_fixtures_folder(self.dag_id, "2023-04-02")), bucket_name, self.aws_region_name ) - # Wait for the previous DAG run to finish - ti = env.run_task("wait_for_prev_dag_run") - self.assertEqual(State.SUCCESS, ti.state) + # First run: snapshot + data_interval_start = pendulum.datetime(2023, 4, 2) + with env.create_dag_run(dag, data_interval_start) as dag_run: + # Mocked and expected data + release = OpenAlexRelease( + dag_id=self.dag_id, + run_id=dag_run.run_id, + entities=[], + download_bucket=workflow.cloud_workspace.download_bucket, + changefile_start_date=data_interval_start, + changefile_end_date=data_interval_start, + is_first_run=True, + ) - # Check dependencies - ti = env.run_task(workflow.check_dependencies.__name__) - self.assertEqual(State.SUCCESS, ti.state) + # Wait for the previous DAG run to finish + ti = env.run_task("wait_for_prev_dag_run") + self.assertEqual(State.SUCCESS, ti.state) - # Create bucket - with aws_bucket_test_env(prefix=self.dag_id, region_name=self.aws_region_name) as bucket_name: - # Create mini OpenAlex dataset - workflow.aws_openalex_bucket = bucket_name - create_openalex_dataset( - pathlib.Path(test_fixtures_folder(self.dag_id, "2023-04-02")), bucket_name, self.aws_region_name - ) + # Check dependencies + ti = env.run_task(workflow.check_dependencies.__name__) + self.assertEqual(State.SUCCESS, ti.state) # Fetch releases and check that we have received the expected snapshot date and changefiles task_id = workflow.fetch_releases.__name__ @@ -667,550 +664,86 @@ def test_telescope(self): ti = env.run_task(workflow.aws_to_gcs_transfer.__name__) self.assertEqual(State.SUCCESS, ti.state) - for entity_name, transform in workflow.entities: - if transform: - ti = env.run_task(f"download_{entity_name}") - self.assertEqual(State.SUCCESS, ti.state) - - ti = env.run_task(workflow.transform.__name__) - self.assertEqual(State.SUCCESS, ti.state) + for entity_name, transform in workflow.entities: + if transform: + ti = env.run_task(f"download_{entity_name}") + self.assertEqual(State.SUCCESS, ti.state) - ti = env.run_task(workflow.upload_upsert_files.__name__) - self.assertEqual(State.SUCCESS, ti.state) - - ti = env.run_task(workflow.bq_load_upsert_tables.__name__) - self.assertEqual(State.SUCCESS, ti.state) + ti = env.run_task(workflow.transform.__name__) + self.assertEqual(State.SUCCESS, ti.state) - ti = env.run_task(workflow.bq_upsert_records.__name__) - self.assertEqual(State.SUCCESS, ti.state) + ti = env.run_task(workflow.upload_upsert_files.__name__) + self.assertEqual(State.SUCCESS, ti.state) - ti = env.run_task(workflow.bq_load_delete_tables.__name__) - self.assertEqual(State.SUCCESS, ti.state) + ti = env.run_task(workflow.bq_load_upsert_tables.__name__) + self.assertEqual(State.SUCCESS, ti.state) - ti = env.run_task(workflow.bq_delete_records.__name__) - self.assertEqual(State.SUCCESS, ti.state) + ti = env.run_task(workflow.bq_upsert_records.__name__) + self.assertEqual(State.SUCCESS, ti.state) - # Check that there is one dataset release per entity after add_new_dataset_releases - for entity_name, _ in workflow.entities: - dataset_releases = get_dataset_releases(dag_id=self.dag_id, dataset_id=entity_name) - self.assertEqual(len(dataset_releases), 0) - ti = env.run_task(workflow.add_new_dataset_releases.__name__) - self.assertEqual(State.SUCCESS, ti.state) - for entity_name, _ in workflow.entities: - dataset_releases = get_dataset_releases(dag_id=self.dag_id, dataset_id=entity_name) - self.assertEqual(len(dataset_releases), 1) + ti = env.run_task(workflow.bq_load_delete_tables.__name__) + self.assertEqual(State.SUCCESS, ti.state) - # Test that all workflow data deleted - ti = env.run_task(workflow.cleanup.__name__) - self.assertEqual(State.SUCCESS, ti.state) - self.assert_cleanup(release.workflow_folder) + ti = env.run_task(workflow.bq_delete_records.__name__) + self.assertEqual(State.SUCCESS, ti.state) - ti = env.run_task("dag_run_complete") - self.assertEqual(State.SUCCESS, ti.state) + # Check that there is one dataset release per entity after add_new_dataset_releases + for entity_name, _ in workflow.entities: + dataset_releases = get_dataset_releases(dag_id=self.dag_id, dataset_id=entity_name) + self.assertEqual(len(dataset_releases), 0) + ti = env.run_task(workflow.add_new_dataset_releases.__name__) + self.assertEqual(State.SUCCESS, ti.state) + for entity_name, _ in workflow.entities: + dataset_releases = get_dataset_releases(dag_id=self.dag_id, dataset_id=entity_name) + self.assertEqual(len(dataset_releases), 1) - # Second run: no updates - data_interval_start = pendulum.datetime(2023, 4, 9) - with env.create_dag_run(dag, data_interval_start) as dag_run: - # Check that first three tasks are successful - task_ids = ["wait_for_prev_dag_run", "check_dependencies", "fetch_releases"] - for task_id in task_ids: - ti = env.run_task(task_id) + # Test that all workflow data deleted + ti = env.run_task(workflow.cleanup.__name__) self.assertEqual(State.SUCCESS, ti.state) + self.assert_cleanup(release.workflow_folder) - # Check that all subsequent tasks are skipped - task_ids = [ - "create_datasets", - "bq_create_main_table_snapshots", - "aws_to_gcs_transfer", - "download", - "transform", - "upload_upsert_files", - "bq_load_upsert_tables", - "bq_upsert_records", - "bq_load_delete_tables", - "bq_delete_records", - "add_new_dataset_releases", - "cleanup", - ] - for task_id in task_ids: - ti = env.run_task(task_id) - self.assertEqual(State.SKIPPED, ti.state) + ti = env.run_task("dag_run_complete") + self.assertEqual(State.SUCCESS, ti.state) - # Check that only 1 dataset release exists for each entity - for entity_name, _ in workflow.entities: - dataset_releases = get_dataset_releases(dag_id=self.dag_id, dataset_id=entity_name) - self.assertEqual(len(dataset_releases), 1) + # Second run: no updates + data_interval_start = pendulum.datetime(2023, 4, 9) + with env.create_dag_run(dag, data_interval_start) as dag_run: + # Check that first three tasks are successful + task_ids = ["wait_for_prev_dag_run", "check_dependencies", "fetch_releases"] + for task_id in task_ids: + ti = env.run_task(task_id) + self.assertEqual(State.SUCCESS, ti.state) - # Third run: changefiles - data_interval_start = pendulum.datetime(2023, 4, 16) - with env.create_dag_run(dag, data_interval_start) as dag_run: - pass - - # # Test that all dependencies are specified: no error should be thrown - # env.run_task(telescope.check_dependencies.__name__) - # start_date, end_date, first_release = telescope.get_release_info( - # dag=dag, - # data_interval_end=pendulum.datetime(year=2021, month=12, day=26), - # ) - # self.assertEqual(dag.default_args["start_date"], start_date) - # self.assertEqual(pendulum.datetime(year=2021, month=12, day=26), end_date) - # self.assertTrue(first_release) - # - # # Use release info for other tasks - # release = OpenAlexRelease( - # telescope.dag_id, - # telescope.workflow_id, - # telescope.dataset_type_id, - # start_date, - # end_date, - # first_release, - # max_processes=1, - # ) - # - # # Mock response of get_object on last_modified file, mocking lambda file - # side_effect = [] - # for entity in self.entities: - # manifest_content = render_template( - # self.manifest_obj_path, entity=entity, date=run["manifest_date"] - # ).encode() - # side_effect.append({"Body": StreamingBody(io.BytesIO(manifest_content), len(manifest_content))}) - # mock_client().get_object.side_effect = side_effect - # - # # Test write transfer manifest task - # env.run_task(telescope.write_transfer_manifest.__name__) - # self.assert_file_integrity( - # release.transfer_manifest_path_unchanged, run["manifest_unchanged_hash"], "md5" - # ) - # self.assert_file_integrity( - # release.transfer_manifest_path_transform, run["manifest_transform_hash"], "md5" - # ) - # - # # Test upload transfer manifest task - # env.run_task(telescope.upload_transfer_manifest.__name__) - # self.assert_blob_integrity( - # env.download_bucket, - # release.transfer_manifest_blob_unchanged, - # release.transfer_manifest_path_unchanged, - # ) - # self.assert_blob_integrity( - # env.download_bucket, - # release.transfer_manifest_blob_transform, - # release.transfer_manifest_path_transform, + # Check that all subsequent tasks are skipped + task_ids = ["create_datasets", "bq_create_main_table_snapshots", "aws_to_gcs_transfer"] + task_ids += [f"download_{entity_name}" for entity_name, transform in workflow.entities if transform] + task_ids += [ + "transform", + "upload_upsert_files", + "bq_load_upsert_tables", + "bq_upsert_records", + "bq_load_delete_tables", + "bq_delete_records", + "add_new_dataset_releases", + "cleanup", + ] + for task_id in task_ids: + ti = env.run_task(task_id) + self.assertEqual(State.SKIPPED, ti.state) + + # Check that only 1 dataset release exists for each entity + for entity_name, _ in workflow.entities: + dataset_releases = get_dataset_releases(dag_id=self.dag_id, dataset_id=entity_name) + self.assertEqual(len(dataset_releases), 1) + + # Create bucket and dataset for use in third run + # with aws_bucket_test_env(prefix=self.dag_id, region_name=self.aws_region_name) as bucket_name: + # workflow.aws_openalex_bucket = bucket_name + # create_openalex_dataset( + # pathlib.Path(test_fixtures_folder(self.dag_id, "2023-04-09")), bucket_name, + # self.aws_region_name # ) - # - # # Test transfer task - # mock_transfer.reset_mock() - # mock_transfer.return_value = True, 2 - # env.run_task(telescope.transfer.__name__) - # self.assertEqual(3, mock_transfer.call_count) - # try: - # self.assertTupleEqual(mock_transfer.call_args_list[0][0], (conn.login, conn.password)) - # self.assertTupleEqual(mock_transfer.call_args_list[1][0], (conn.login, conn.password)) - # self.assertTupleEqual(mock_transfer.call_args_list[2][0], (conn.login, conn.password)) - # except AssertionError: - # raise AssertionError("AWS key id and secret not passed correctly to transfer function") - # self.assertDictEqual( - # mock_transfer.call_args_list[0][1], - # { - # "aws_bucket": OpenAlexTelescope.AWS_BUCKET, - # "include_prefixes": [], - # "gc_project_id": self.project_id, - # "gc_bucket": release.download_bucket, - # "gc_bucket_path": f"telescopes/{release.dag_id}/{release.release_id}/unchanged/", - # "description": f"Transfer OpenAlex data from Airflow telescope to {release.download_bucket}", - # "transfer_manifest": f"gs://{release.download_bucket}/{release.transfer_manifest_blob_unchanged}", - # }, - # ) - # self.assertDictEqual( - # mock_transfer.call_args_list[1][1], - # { - # "aws_bucket": OpenAlexTelescope.AWS_BUCKET, - # "include_prefixes": [], - # "gc_project_id": self.project_id, - # "gc_bucket": release.transform_bucket, - # "gc_bucket_path": f"telescopes/{release.dag_id}/{release.release_id}/", - # "description": f"Transfer OpenAlex data from Airflow telescope to {release.transform_bucket}", - # "transfer_manifest": f"gs://{release.download_bucket}/{release.transfer_manifest_blob_unchanged}", - # }, - # ) - # self.assertDictEqual( - # mock_transfer.call_args_list[2][1], - # { - # "aws_bucket": OpenAlexTelescope.AWS_BUCKET, - # "include_prefixes": [], - # "gc_project_id": self.project_id, - # "gc_bucket": release.download_bucket, - # "gc_bucket_path": f"telescopes/{release.dag_id}/{release.release_id}/transform/", - # "description": f"Transfer OpenAlex data from Airflow telescope to {release.download_bucket}", - # "transfer_manifest": f"gs://{release.download_bucket}/{release.transfer_manifest_blob_transform}", - # }, - # ) - # - # # Upload files to bucket, to mock transfer - # for entity, info in self.entities.items(): - # gzip_path = f"{entity}.jsonl.gz" - # with open(info["download_path"], "rb") as f_in, gzip.open(gzip_path, "wb") as f_out: - # f_out.writelines(f_in) - # - # if entity == "authors" or entity == "venues": - # download_blob = ( - # f"telescopes/{release.dag_id}/{release.release_id}/unchanged/" - # f"data/{entity}/updated_date={run['manifest_date']}/0000_part_00.gz" - # ) - # transform_blob = ( - # f"telescopes/{release.dag_id}/{release.release_id}/" - # f"data/{entity}/updated_date={run['manifest_date']}/0000_part_00.gz" - # ) - # gcs_upload_file(release.download_bucket, download_blob, gzip_path) - # gcs_upload_file(release.transform_bucket, transform_blob, gzip_path) - # else: - # download_blob = ( - # f"telescopes/{release.dag_id}/{release.release_id}/transform/" - # f"data/{entity}/updated_date={run['manifest_date']}/0000_part_00.gz" - # ) - # gcs_upload_file(release.download_bucket, download_blob, gzip_path) - # - # # Test that file was downloaded - # env.run_task(telescope.download_transferred.__name__) - # self.assertEqual(3, len(release.download_files)) - # for file in release.download_files: - # entity = file.split("/")[-3] - # self.assert_file_integrity(file, self.entities[entity]["download_hash"], "gzip_crc") - # - # # Test that files transformed - # env.run_task(telescope.transform.__name__) - # self.assertEqual(3, len(release.transform_files)) - # # Sort lines so that gzip crc is always the same - # for file in release.transform_files: - # entity = file.split("/")[-3] - # with gzip.open(file, "rb") as f_in: - # lines = sorted(f_in.readlines()) - # with gzip.open(file, "wb") as f_out: - # f_out.writelines(lines) - # self.assert_file_integrity(file, self.entities[entity]["transform_hash"], "gzip_crc") - # - # # Test that transformed files uploaded - # env.run_task(telescope.upload_transformed.__name__) - # for entity, info in self.entities.items(): - # if entity in ["concepts", "institutions", "works"]: - # file = [file for file in release.transform_files if entity in file][0] - # else: - # file = f"{entity}.jsonl.gz" - # blob = f"telescopes/{release.dag_id}/{release.release_id}/data/{entity}/updated_date={run['manifest_date']}/0000_part_00.gz" - # self.assert_blob_integrity(env.transform_bucket, blob, file) - # - # # Get bq load info for BQ tasks - # bq_load_info = telescope.get_bq_load_info(release) - # - # # Test append new creates table - # env.run_task(telescope.bq_append_new.__name__) - # for _, table, _ in bq_load_info: - # table_id = f"{self.project_id}.{telescope.dataset_id}.{table}" - # expected_bytes = run["table_bytes"][table] - # self.assert_table_bytes(table_id, expected_bytes) - # - # # Test delete old task is skipped for the first release - # ti = env.run_task(telescope.bq_delete_old.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # - # # Test create bigquery snapshot - # ti = env.run_task(telescope.bq_create_snapshot.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # - # # Test adding of dataset releases as well as cleanup - # download_folder, extract_folder, transform_folder = ( - # release.download_folder, - # release.extract_folder, - # release.transform_folder, - # ) - # - # openalex_dataset_releases = get_dataset_releases(dataset_id=1) - # institution_dataset_releases = get_dataset_releases(dataset_id=2) - # author_dataset_releases = get_dataset_releases(dataset_id=3) - # self.assertListEqual([], openalex_dataset_releases) - # self.assertListEqual([], institution_dataset_releases) - # self.assertListEqual([], author_dataset_releases) - # - # ti = env.run_task(telescope.add_new_dataset_releases.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # - # openalex_dataset_releases = get_dataset_releases(dataset_id=1) - # institution_dataset_releases = get_dataset_releases(dataset_id=2) - # author_dataset_releases = get_dataset_releases(dataset_id=3) - # self.assertEqual(1, len(openalex_dataset_releases)) - # self.assertEqual(1, len(institution_dataset_releases)) - # self.assertEqual(1, len(author_dataset_releases)) - # self.assertEqual(release.end_date, openalex_dataset_releases[0].end_date) - # self.assertEqual( - # pendulum.from_format("2021-12-01", "YYYY-MM-DD"), openalex_dataset_releases[0].start_date - # ) - # self.assertEqual( - # pendulum.from_format("2021-12-26", "YYYY-MM-DD"), openalex_dataset_releases[0].end_date - # ) - # self.assertEqual( - # pendulum.from_format("2021-12-01", "YYYY-MM-DD"), author_dataset_releases[0].start_date - # ) - # self.assertEqual(pendulum.from_format("2021-12-17", "YYYY-MM-DD"), author_dataset_releases[0].end_date) - # self.assertEqual( - # pendulum.from_format("2021-12-01", "YYYY-MM-DD"), institution_dataset_releases[0].start_date - # ) - # self.assertEqual( - # pendulum.from_format("2021-12-17", "YYYY-MM-DD"), institution_dataset_releases[0].end_date - # ) - # self.assert_cleanup(download_folder, extract_folder, transform_folder) - # - # run = self.second_run - # with env.create_dag_run(dag, run["execution_date"]) as dag_run: - # # Test that all dependencies are specified: no error should be thrown - # env.run_task(telescope.check_dependencies.__name__) - # start_date, end_date, first_release = telescope.get_release_info( - # dag=dag, - # data_interval_end=pendulum.datetime(year=2022, month=1, day=2), - # ) - # self.assertEqual(release.end_date, start_date) - # self.assertEqual(pendulum.datetime(year=2022, month=1, day=2), end_date) - # self.assertFalse(first_release) - # - # # Use release info for other tasks - # release = OpenAlexRelease( - # telescope.dag_id, - # telescope.workflow_id, - # telescope.dataset_type_id, - # start_date, - # end_date, - # first_release, - # max_processes=1, - # ) - # - # # Mock response of get_object on last_modified file, mocking lambda file - # side_effect = [] - # for entity in self.entities: - # manifest_content = render_template( - # self.manifest_obj_path, entity=entity, date=run["manifest_date"] - # ).encode() - # side_effect.append({"Body": StreamingBody(io.BytesIO(manifest_content), len(manifest_content))}) - # mock_client().get_object.side_effect = side_effect - # - # # Test write transfer manifest task - # env.run_task(telescope.write_transfer_manifest.__name__) - # self.assert_file_integrity( - # release.transfer_manifest_path_unchanged, run["manifest_unchanged_hash"], "md5" - # ) - # self.assert_file_integrity( - # release.transfer_manifest_path_transform, run["manifest_transform_hash"], "md5" - # ) - # - # # Test upload transfer manifest task - # env.run_task(telescope.upload_transfer_manifest.__name__) - # self.assert_blob_integrity( - # env.download_bucket, - # release.transfer_manifest_blob_unchanged, - # release.transfer_manifest_path_unchanged, - # ) - # self.assert_blob_integrity( - # env.download_bucket, - # release.transfer_manifest_blob_transform, - # release.transfer_manifest_path_transform, - # ) - # - # # Test transfer task - # mock_transfer.reset_mock() - # mock_transfer.return_value = True, 2 - # env.run_task(telescope.transfer.__name__) - # self.assertEqual(3, mock_transfer.call_count) - # try: - # self.assertTupleEqual(mock_transfer.call_args_list[0][0], (conn.login, conn.password)) - # self.assertTupleEqual(mock_transfer.call_args_list[1][0], (conn.login, conn.password)) - # self.assertTupleEqual(mock_transfer.call_args_list[2][0], (conn.login, conn.password)) - # except AssertionError: - # raise AssertionError("AWS key id and secret not passed correctly to transfer function") - # - # self.assertDictEqual( - # mock_transfer.call_args_list[0][1], - # { - # "aws_bucket": OpenAlexTelescope.AWS_BUCKET, - # "include_prefixes": [], - # "gc_project_id": self.project_id, - # "gc_bucket": release.download_bucket, - # "gc_bucket_path": f"telescopes/{release.dag_id}/{release.release_id}/unchanged/", - # "description": f"Transfer OpenAlex data from Airflow telescope to {release.download_bucket}", - # "transfer_manifest": f"gs://{release.download_bucket}/{release.transfer_manifest_blob_unchanged}", - # }, - # ) - # self.assertDictEqual( - # mock_transfer.call_args_list[1][1], - # { - # "aws_bucket": OpenAlexTelescope.AWS_BUCKET, - # "include_prefixes": [], - # "gc_project_id": self.project_id, - # "gc_bucket": release.transform_bucket, - # "gc_bucket_path": f"telescopes/{release.dag_id}/{release.release_id}/", - # "description": f"Transfer OpenAlex data from Airflow telescope to {release.transform_bucket}", - # "transfer_manifest": f"gs://{release.download_bucket}/{release.transfer_manifest_blob_unchanged}", - # }, - # ) - # self.assertDictEqual( - # mock_transfer.call_args_list[2][1], - # { - # "aws_bucket": OpenAlexTelescope.AWS_BUCKET, - # "include_prefixes": [], - # "gc_project_id": self.project_id, - # "gc_bucket": release.download_bucket, - # "gc_bucket_path": f"telescopes/{release.dag_id}/{release.release_id}/transform/", - # "description": f"Transfer OpenAlex data from Airflow telescope to {release.download_bucket}", - # "transfer_manifest": f"gs://{release.download_bucket}/{release.transfer_manifest_blob_transform}", - # }, - # ) - # - # # Upload files to bucket, to mock transfer - # for entity, info in self.entities.items(): - # gzip_path = f"{entity}.jsonl.gz" - # with open(info["download_path"], "rb") as f_in, gzip.open(gzip_path, "wb") as f_out: - # f_out.writelines(f_in) - # - # if entity == "authors" or entity == "venues": - # download_blob = ( - # f"telescopes/{release.dag_id}/{release.release_id}/unchanged/" - # f"data/{entity}/updated_date={run['manifest_date']}/0000_part_00.gz" - # ) - # transform_blob = ( - # f"telescopes/{release.dag_id}/{release.release_id}/" - # f"data/{entity}/updated_date={run['manifest_date']}/0000_part_00.gz" - # ) - # gcs_upload_file(release.download_bucket, download_blob, gzip_path) - # gcs_upload_file(release.transform_bucket, transform_blob, gzip_path) - # else: - # download_blob = ( - # f"telescopes/{release.dag_id}/{release.release_id}/transform/" - # f"data/{entity}/updated_date={run['manifest_date']}/0000_part_00.gz" - # ) - # gcs_upload_file(release.download_bucket, download_blob, gzip_path) - # - # # Test that file was downloaded - # env.run_task(telescope.download_transferred.__name__) - # self.assertEqual(3, len(release.download_files)) - # for file in release.download_files: - # entity = file.split("/")[-3] - # self.assert_file_integrity(file, self.entities[entity]["download_hash"], "gzip_crc") - # - # # Test that files transformed - # env.run_task(telescope.transform.__name__) - # self.assertEqual(3, len(release.transform_files)) - # # Sort lines so that gzip crc is always the same - # for file in release.transform_files: - # entity = file.split("/")[-3] - # with gzip.open(file, "rb") as f_in: - # lines = sorted(f_in.readlines()) - # with gzip.open(file, "wb") as f_out: - # f_out.writelines(lines) - # self.assert_file_integrity(file, self.entities[entity]["transform_hash"], "gzip_crc") - # - # # Test that transformed files uploaded - # env.run_task(telescope.upload_transformed.__name__) - # for entity, info in self.entities.items(): - # if entity in ["concepts", "institutions", "works"]: - # file = [file for file in release.transform_files if entity in file][0] - # else: - # file = f"{entity}.jsonl.gz" - # blob = f"telescopes/{release.dag_id}/{release.release_id}/data/{entity}/updated_date={run['manifest_date']}/0000_part_00.gz" - # self.assert_blob_integrity(env.transform_bucket, blob, file) - # - # # Get bq load info for BQ tasks - # bq_load_info = telescope.get_bq_load_info(release) - # - # # Test append new creates table - # env.run_task(telescope.bq_append_new.__name__) - # for _, table, _ in bq_load_info: - # table_id = f"{self.project_id}.{telescope.dataset_id}.{table}" - # expected_bytes = run["table_bytes"][table] - # self.assert_table_bytes(table_id, expected_bytes) - # - # # Test that old rows are deleted from table - # env.run_task(telescope.bq_delete_old.__name__) - # for _, table, _ in bq_load_info: - # table_id = f"{self.project_id}.{telescope.dataset_id}.{table}" - # expected_bytes = self.first_run["table_bytes"][table] - # self.assert_table_bytes(table_id, expected_bytes) - # - # # Test create bigquery snapshot - # ti = env.run_task(telescope.bq_create_snapshot.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # - # # Test adding of dataset releases and cleanup of local files - # download_folder, extract_folder, transform_folder = ( - # release.download_folder, - # release.extract_folder, - # release.transform_folder, - # ) - # - # openalex_dataset_releases = get_dataset_releases(dataset_id=1) - # institution_dataset_releases = get_dataset_releases(dataset_id=2) - # author_dataset_releases = get_dataset_releases(dataset_id=3) - # self.assertEqual(1, len(openalex_dataset_releases)) - # self.assertEqual(1, len(institution_dataset_releases)) - # self.assertEqual(1, len(author_dataset_releases)) - # - # ti = env.run_task(telescope.add_new_dataset_releases.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # - # openalex_dataset_releases = get_dataset_releases(dataset_id=1) - # institution_dataset_releases = get_dataset_releases(dataset_id=2) - # author_dataset_releases = get_dataset_releases(dataset_id=3) - # self.assertEqual(2, len(openalex_dataset_releases)) - # self.assertEqual(2, len(institution_dataset_releases)) - # self.assertEqual(2, len(author_dataset_releases)) - # self.assertEqual(release.end_date, openalex_dataset_releases[1].end_date) - # self.assertEqual( - # pendulum.from_format("2021-12-26", "YYYY-MM-DD"), openalex_dataset_releases[1].start_date - # ) - # self.assertEqual( - # pendulum.from_format("2022-01-02", "YYYY-MM-DD"), openalex_dataset_releases[1].end_date - # ) - # self.assertEqual( - # pendulum.from_format("2021-12-17", "YYYY-MM-DD"), author_dataset_releases[1].start_date - # ) - # self.assertEqual(pendulum.from_format("2022-1-17", "YYYY-MM-DD"), author_dataset_releases[1].end_date) - # self.assertEqual( - # pendulum.from_format("2021-12-17", "YYYY-MM-DD"), institution_dataset_releases[1].start_date - # ) - # self.assertEqual( - # pendulum.from_format("2022-1-17", "YYYY-MM-DD"), institution_dataset_releases[1].end_date - # ) - # - # self.assert_cleanup(download_folder, extract_folder, transform_folder) - # - # run = self.third_run - # with env.create_dag_run(dag, run["execution_date"]) as dag_run: - # # Test that all dependencies are specified: no error should be thrown - # env.run_task(telescope.check_dependencies.__name__) - # - # # Mock response of get_object on last_modified file, mocking lambda file - # side_effect = [] - # for entity in self.entities: - # manifest_content = render_template( - # self.manifest_obj_path, entity=entity, date=run["manifest_date"] - # ).encode() - # side_effect.append({"Body": StreamingBody(io.BytesIO(manifest_content), len(manifest_content))}) - # mock_client().get_object.side_effect = side_effect - # - # # Test write transfer manifest task - # ti = env.run_task(telescope.write_transfer_manifest.__name__) - # self.assertEqual(ti.state, State.SKIPPED) - # - # run = self.fourth_run - # with env.create_dag_run(dag, run["execution_date"]) as dag_run: - # # Test that all dependencies are specified: no error should be thrown - # env.run_task(telescope.check_dependencies.__name__) - # - # # Mock response of get_object on last_modified file, mocking lambda file - # side_effect = [] - # for entity in self.entities: - # manifest_content = render_template( - # self.manifest_obj_path, entity=entity, date=run["manifest_date"] - # ).encode() - # side_effect.append({"Body": StreamingBody(io.BytesIO(manifest_content), len(manifest_content))}) - # mock_client().get_object.side_effect = side_effect - # - # # Test write transfer manifest task - # ti = env.run_task(telescope.write_transfer_manifest.__name__) - # self.assertEqual(State.SUCCESS, ti.state) + # Third run: changefiles + # data_interval_start = pendulum.datetime(2023, 4, 16) + # with env.create_dag_run(dag, data_interval_start) as dag_run: + # pass