diff --git a/academic_observatory_workflows/fixtures/unpaywall/changed_dois_with_versions_2023-04-25T080001.jsonl.gz b/academic_observatory_workflows/fixtures/unpaywall/changed_dois_with_versions_2023-04-25T080001.jsonl.gz new file mode 100644 index 000000000..c928aa65c --- /dev/null +++ b/academic_observatory_workflows/fixtures/unpaywall/changed_dois_with_versions_2023-04-25T080001.jsonl.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:2361c2c9110d81ecb6075c3b4dbd6b2863544e0e926f355d797bec1043973533 +size 1836 diff --git a/academic_observatory_workflows/fixtures/unpaywall/changed_dois_with_versions_2023-04-26T080001.jsonl.gz b/academic_observatory_workflows/fixtures/unpaywall/changed_dois_with_versions_2023-04-26T080001.jsonl.gz new file mode 100644 index 000000000..3ae3b2565 --- /dev/null +++ b/academic_observatory_workflows/fixtures/unpaywall/changed_dois_with_versions_2023-04-26T080001.jsonl.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:6f96834bdf284509c9fb20854ad87ccf07d3e5236cbee54c025f12f5ced33dc0 +size 1292 diff --git a/academic_observatory_workflows/fixtures/unpaywall/changed_dois_with_versions_2023-04-27T080001.jsonl.gz b/academic_observatory_workflows/fixtures/unpaywall/changed_dois_with_versions_2023-04-27T080001.jsonl.gz new file mode 100644 index 000000000..8de7ff474 --- /dev/null +++ b/academic_observatory_workflows/fixtures/unpaywall/changed_dois_with_versions_2023-04-27T080001.jsonl.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:4d9d539b759be67d82b972f9207d273179719f9370aeb48e72b20c4426ef186a +size 1203 diff --git a/academic_observatory_workflows/fixtures/unpaywall/daily-feed/changefile/changed_dois_with_versions_2021-07-01T080001.jsonl.gz b/academic_observatory_workflows/fixtures/unpaywall/daily-feed/changefile/changed_dois_with_versions_2021-07-01T080001.jsonl.gz deleted file mode 100644 index e032c0063..000000000 --- a/academic_observatory_workflows/fixtures/unpaywall/daily-feed/changefile/changed_dois_with_versions_2021-07-01T080001.jsonl.gz +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:7261234b036a78842a2d71f5de635842a69f41c0fcdb35d0ca3dd4e50d6ef070 -size 1295 diff --git a/academic_observatory_workflows/fixtures/unpaywall/daily-feed/changefile/changed_dois_with_versions_2021-07-02T080001.jsonl.gz b/academic_observatory_workflows/fixtures/unpaywall/daily-feed/changefile/changed_dois_with_versions_2021-07-02T080001.jsonl.gz deleted file mode 100644 index 7432a1786..000000000 --- a/academic_observatory_workflows/fixtures/unpaywall/daily-feed/changefile/changed_dois_with_versions_2021-07-02T080001.jsonl.gz +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:614024b41be4ee811cce5de03559a9bcf8ba1f68a54c2b7ae5bc259057e5c1dc -size 1295 diff --git a/academic_observatory_workflows/fixtures/unpaywall/daily-feed/changefile/changed_dois_with_versions_2021-07-03T080001.jsonl.gz b/academic_observatory_workflows/fixtures/unpaywall/daily-feed/changefile/changed_dois_with_versions_2021-07-03T080001.jsonl.gz deleted file mode 100644 index 51f2566a7..000000000 --- a/academic_observatory_workflows/fixtures/unpaywall/daily-feed/changefile/changed_dois_with_versions_2021-07-03T080001.jsonl.gz +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:9346e7ed0754b2a102651e165105bf49406e46b020ddf329c7c9e7f3263c6087 -size 1295 diff --git a/academic_observatory_workflows/fixtures/unpaywall/daily-feed/changefiles.jinja2 b/academic_observatory_workflows/fixtures/unpaywall/daily-feed/changefiles.jinja2 deleted file mode 100644 index 8b1c3fa71..000000000 --- a/academic_observatory_workflows/fixtures/unpaywall/daily-feed/changefiles.jinja2 +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:6464a5616eb47050dd7f4081cd43368365a097a7af9db10098b006e9ba6a1038 -size 1277 diff --git a/academic_observatory_workflows/fixtures/unpaywall/expected/run1_bq_load_main_table.json b/academic_observatory_workflows/fixtures/unpaywall/expected/run1_bq_load_main_table.json new file mode 100644 index 000000000..ba8a9f3ef --- /dev/null +++ b/academic_observatory_workflows/fixtures/unpaywall/expected/run1_bq_load_main_table.json @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:26e30270c0b0dfb4ee9512c89526c32a2a42bb93bdec557054a79d16660f1197 +size 55555 diff --git a/academic_observatory_workflows/fixtures/unpaywall/expected/run1_bq_upsert_records.json b/academic_observatory_workflows/fixtures/unpaywall/expected/run1_bq_upsert_records.json new file mode 100644 index 000000000..0e88a540c --- /dev/null +++ b/academic_observatory_workflows/fixtures/unpaywall/expected/run1_bq_upsert_records.json @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:d3e6f1102a9addd369c709b71c222d8b0d909da5d9be07b124b50bb25761ff4c +size 55568 diff --git a/academic_observatory_workflows/fixtures/unpaywall/expected/run3_bq_upsert_records.json b/academic_observatory_workflows/fixtures/unpaywall/expected/run3_bq_upsert_records.json new file mode 100644 index 000000000..9c4adcf58 --- /dev/null +++ b/academic_observatory_workflows/fixtures/unpaywall/expected/run3_bq_upsert_records.json @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:7362e353deee208ea2e27f2b7ed79ce8ee30a836e437069f01b7dec9c04bcb07 +size 58098 diff --git a/academic_observatory_workflows/fixtures/unpaywall/unpaywall_snapshot_2023-04-25T083002.jsonl.gz b/academic_observatory_workflows/fixtures/unpaywall/unpaywall_snapshot_2023-04-25T083002.jsonl.gz index 50ae9c82c..b543e56e3 100644 --- a/academic_observatory_workflows/fixtures/unpaywall/unpaywall_snapshot_2023-04-25T083002.jsonl.gz +++ b/academic_observatory_workflows/fixtures/unpaywall/unpaywall_snapshot_2023-04-25T083002.jsonl.gz @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:75246a7865c91ba82614bfa178e94e69db5e452f5887dfcfccfaddb089ddae8b -size 16559 +oid sha256:7a1a6b6477db6dbe5875f1e8ecadd67062ddaaaeae330f40441931b32b691aa8 +size 4783 diff --git a/academic_observatory_workflows/workflows/openalex_telescope.py b/academic_observatory_workflows/workflows/openalex_telescope.py index c5b63903c..ff3ed124f 100644 --- a/academic_observatory_workflows/workflows/openalex_telescope.py +++ b/academic_observatory_workflows/workflows/openalex_telescope.py @@ -323,6 +323,13 @@ def __init__( self.add_setup_task(self.check_dependencies) self.add_setup_task(self.fetch_releases) self.add_task(self.create_datasets) + + # Create snapshots of main tables in case we mess up + # This is done before updating the tables to make sure that the snapshots haven't expired before the tables + # are updated + self.add_task(self.bq_create_main_table_snapshots) + + # Transfer, download and transform data self.add_task(self.aws_to_gcs_transfer) # Download concepts, institutions and works which need to be pre-processed @@ -337,11 +344,6 @@ def __init__( ) self.add_task(self.transform) - # Create snapshots of main tables in case we messed up - # This is done before updating the tables to make sure that the snapshots haven't expired before the tables - # are updated - self.add_task(self.bq_create_main_table_snapshots) - # Upsert records self.add_task(self.upload_upsert_files) self.add_task(self.bq_load_upsert_tables) @@ -491,6 +493,26 @@ def create_datasets(self, release: OpenAlexRelease, **kwargs) -> None: description=self.dataset_description, ) + def bq_create_main_table_snapshots(self, release: OpenAlexRelease, **kwargs): + """Create a snapshot of each main table. The purpose of this table is to be able to rollback the table + if something goes wrong. The snapshot expires after self.snapshot_expiry_days.""" + + if release.is_first_run: + logging.info(f"bq_create_main_table_snapshots: skipping as snapshots are not created on the first run") + return + + for entity in release.entities: + expiry_date = pendulum.now().add(days=self.snapshot_expiry_days) + logging.info( + f"bq_create_main_table_snapshots: creating backup snapshot for {entity.bq_main_table_id} as {entity.bq_snapshot_table_id} expiring on {expiry_date}" + ) + success = bq_snapshot( + src_table_id=entity.bq_main_table_id, dst_table_id=entity.bq_snapshot_table_id, expiry_date=expiry_date + ) + assert ( + success + ), f"bq_create_main_table_snapshots: error creating backup snapshot for {entity.bq_main_table_id} as {entity.bq_snapshot_table_id} expiring on {expiry_date}" + def aws_to_gcs_transfer(self, release: OpenAlexRelease, **kwargs): """Transfer files from AWS bucket to Google Cloud bucket""" @@ -587,26 +609,6 @@ def upload_upsert_files(self, release: OpenAlexRelease, **kwargs): success = gcs_upload_files(bucket_name=self.cloud_workspace.transform_bucket, file_paths=file_paths) set_task_state(success, self.upload_upsert_files.__name__, release) - def bq_create_main_table_snapshots(self, release: OpenAlexRelease, **kwargs): - """Create a snapshot of each main table. The purpose of this table is to be able to rollback the table - if something goes wrong. The snapshot expires after self.snapshot_expiry_days.""" - - if release.is_first_run: - logging.info(f"bq_create_main_table_snapshots: skipping as snapshots are not created on the first run") - return - - for entity in release.entities: - expiry_date = pendulum.now().add(days=self.snapshot_expiry_days) - logging.info( - f"bq_create_main_table_snapshots: creating backup snapshot for {entity.bq_main_table_id} as {entity.bq_snapshot_table_id} expiring on {expiry_date}" - ) - success = bq_snapshot( - src_table_id=entity.bq_main_table_id, dst_table_id=entity.bq_snapshot_table_id, expiry_date=expiry_date - ) - assert ( - success - ), f"bq_create_main_table_snapshots: error creating backup snapshot for {entity.bq_main_table_id} as {entity.bq_snapshot_table_id} expiring on {expiry_date}" - def bq_load_upsert_tables(self, release: OpenAlexRelease, **kwargs): """Load the upsert table for each entity.""" diff --git a/academic_observatory_workflows/workflows/tests/test_unpaywall_telescope.py b/academic_observatory_workflows/workflows/tests/test_unpaywall_telescope.py index 6dc9f059d..f457b239b 100644 --- a/academic_observatory_workflows/workflows/tests/test_unpaywall_telescope.py +++ b/academic_observatory_workflows/workflows/tests/test_unpaywall_telescope.py @@ -38,29 +38,20 @@ parse_release_msg, UnpaywallRelease, ) +from observatory.platform.api import get_dataset_releases +from observatory.platform.bigquery import bq_sharded_table_id +from observatory.platform.files import list_files +from observatory.platform.gcs import gcs_blob_name_from_path from observatory.platform.observatory_config import Workflow from observatory.platform.observatory_environment import ( ObservatoryEnvironment, ObservatoryTestCase, find_free_port, HttpServer, + load_json, ) -# def create_changefiles(self, host, port): -# # Daily -# template_path = os.path.join(self.fixture_dir, "daily-feed", "changefiles.jinja2") -# changefiles = render_template(template_path, host=host, port=port) -# dst = os.path.join(self.fixture_dir, "daily-feed", "changefiles") -# with open(dst, "w") as f: -# f.write(changefiles) -# -# -# def remove_changefiles(self): -# dst = os.path.join(self.fixture_dir, "daily-feed", "changefiles") -# os.remove(dst) - - class TestUnpaywallUtils(ObservatoryTestCase): def test_changefile(self): class MockRelease: @@ -139,16 +130,16 @@ def test_get_unpaywall_changefiles(self, m_get_http_text_response): m_get_http_text_response.return_value = '{"list":[{"date":"2023-04-25","filename":"changed_dois_with_versions_2023-04-25T080001.jsonl.gz","filetype":"jsonl","last_modified":"2023-04-25T08:03:12","lines":310346,"size":143840367,"url":"https://api.unpaywall.org/daily-feed/changefile/changed_dois_with_versions_2023-04-25T080001.jsonl.gz?api_key=my-api-key"},{"date":"2023-04-24","filename":"changed_dois_with_versions_2023-04-24T080001.jsonl.gz","filetype":"jsonl","last_modified":"2023-04-24T08:04:49","lines":220800,"size":112157260,"url":"https://api.unpaywall.org/daily-feed/changefile/changed_dois_with_versions_2023-04-24T080001.jsonl.gz?api_key=my-api-key"},{"date":"2023-04-23","filename":"changed_dois_with_versions_2023-04-23T080001.jsonl.gz","filetype":"jsonl","last_modified":"2023-04-23T08:03:54","lines":213140,"size":105247617,"url":"https://api.unpaywall.org/daily-feed/changefile/changed_dois_with_versions_2023-04-23T080001.jsonl.gz?api_key=my-api-key"},{"date":"2023-02-24","filename":"changed_dois_with_versions_2023-02-24.jsonl.gz","filetype":"jsonl","last_modified":"2023-03-21T01:51:18","lines":5,"size":6301,"url":"https://api.unpaywall.org/daily-feed/changefile/changed_dois_with_versions_2023-02-24.jsonl.gz?api_key=my-api-key"},{"date":"2020-03-11","filename":"changed_dois_with_versions_2020-03-11T005336.csv.gz","filetype":"csv","last_modified":"2020-03-11T01:27:04","lines":1806534,"size":195900034,"url":"https://api.unpaywall.org/daily-feed/changefile/changed_dois_with_versions_2020-03-11T005336.csv.gz?api_key=my-api-key"}]}' expected_changefiles = [ + Changefile("changed_dois_with_versions_2023-02-24.jsonl.gz", pendulum.datetime(2023, 2, 24)), Changefile( - "changed_dois_with_versions_2023-04-25T080001.jsonl.gz", pendulum.datetime(2023, 4, 25, 8, 0, 1) + "changed_dois_with_versions_2023-04-23T080001.jsonl.gz", pendulum.datetime(2023, 4, 23, 8, 0, 1) ), Changefile( "changed_dois_with_versions_2023-04-24T080001.jsonl.gz", pendulum.datetime(2023, 4, 24, 8, 0, 1) ), Changefile( - "changed_dois_with_versions_2023-04-23T080001.jsonl.gz", pendulum.datetime(2023, 4, 23, 8, 0, 1) + "changed_dois_with_versions_2023-04-25T080001.jsonl.gz", pendulum.datetime(2023, 4, 25, 8, 0, 1) ), - Changefile("changed_dois_with_versions_2023-02-24.jsonl.gz", pendulum.datetime(2023, 2, 24)), ] changefiles = get_unpaywall_changefiles("my-api-key") self.assertEqual(expected_changefiles, changefiles) @@ -190,21 +181,6 @@ def make_changefiles(start_date: pendulum.DateTime, end_date: pendulum.DateTime) def make_snapshot_filename(snapshot_date: pendulum.DateTime): return f"unpaywall_snapshot_{snapshot_date.format('YYYY-MM-DDTHHmmss')}.jsonl.gz" - # [ - # Changefile( - # "changed_dois_with_versions_2023-04-25T080001.jsonl.gz", - # pendulum.datetime(2023, 4, 25, 8, 0, 1) - # ), - # Changefile( - # "changed_dois_with_versions_2023-04-24T080001.jsonl.gz", - # pendulum.datetime(2023, 4, 24, 8, 0, 1) - # ), - # Changefile( - # "changed_dois_with_versions_2023-04-23T080001.jsonl.gz", - # pendulum.datetime(2023, 4, 23, 8, 0, 1) - # ), - # ] - class TestUnpaywallTelescope(ObservatoryTestCase): def __init__(self, *args, **kwargs): @@ -214,11 +190,6 @@ def __init__(self, *args, **kwargs): self.project_id = os.getenv("TEST_GCP_PROJECT_ID") self.data_location = os.getenv("TEST_GCP_DATA_LOCATION") - # self.fixture_dir = test_fixtures_folder("unpaywall") - # self.snapshot_file = "unpaywall_2021-07-02T151134.jsonl.gz" - # self.snapshot_path = os.path.join(self.fixture_dir, self.snapshot_file) - # self.snapshot_hash = "0f1ac32355c4582d82ae4bc76db17c26" # md5 - def test_dag_structure(self): """Test that the DAG has the correct structure.""" @@ -231,7 +202,8 @@ def test_dag_structure(self): { "check_dependencies": ["fetch_releases"], "fetch_releases": ["create_datasets"], - "create_datasets": ["download_snapshot"], + "create_datasets": ["bq_create_main_table_snapshot"], + "bq_create_main_table_snapshot": ["download_snapshot"], "download_snapshot": ["upload_downloaded_snapshot"], "upload_downloaded_snapshot": ["extract_snapshot"], "extract_snapshot": ["transform_snapshot"], @@ -245,8 +217,7 @@ def test_dag_structure(self): "transform_change_files": ["upload_upsert_files"], "upload_upsert_files": ["bq_load_upsert_table"], "bq_load_upsert_table": ["bq_upsert_records"], - "bq_upsert_records": ["bq_create_main_table_snapshot"], - "bq_create_main_table_snapshot": ["add_new_dataset_releases"], + "bq_upsert_records": ["add_new_dataset_releases"], "add_new_dataset_releases": ["cleanup"], "cleanup": [], }, @@ -270,20 +241,12 @@ def test_dag_load(self): with env.create(): self.assert_dag_load_from_config(self.dag_id) - # We want to do 3 dag runs. First is to load snapshot. - # Second run brings us up to date and applies the overlapping diffs from before snapshot. - # Third checks the "all other situations" case - # @patch("academic_observatory_workflows.workflows.unpaywall_telescope.get_filename_from_http_header") def test_telescope(self): - # m_makeapi.return_value = self.api - # m_http_response.return_value = self.snapshot_file - # - # env = self.setup_observatory_environment() - # - # first_execution_date = pendulum.datetime(2021, 7, 3) # Snapshot - # second_execution_date = pendulum.datetime(2021, 7, 4) # Not enough updates found - # third_execution_date = pendulum.datetime(2021, 7, 5) # Updates found - # fourth_execution_date = pendulum.datetime(2021, 7, 6) # No updates found + """Test workflow end to end. + + The test files in fixtures/unpaywall have been carefully crafted to make sure that the data is loaded + into BigQuery correctly. + """ env = ObservatoryEnvironment(self.project_id, self.data_location, api_port=find_free_port()) bq_dataset_id = env.add_dataset() @@ -299,22 +262,22 @@ def test_telescope(self): dag = workflow.make_dag() # First run: snapshot and initial changefiles - execution_date = pendulum.datetime(2023, 4, 25) + data_interval_start = pendulum.datetime(2023, 4, 25) snapshot_date = pendulum.datetime(2023, 4, 25, 8, 30, 2) - changfile_date = pendulum.datetime(2023, 4, 25, 8, 0, 1) - with env.create_dag_run(dag, execution_date) as dag_run: + changefile_date = pendulum.datetime(2023, 4, 25, 8, 0, 1) + with env.create_dag_run(dag, data_interval_start) as dag_run: # Mocked and expected data release = UnpaywallRelease( dag_id=self.dag_id, run_id=dag_run.run_id, is_first_run=True, snapshot_date=snapshot_date, - changefile_start_date=changfile_date, - changefile_end_date=changfile_date, + changefile_start_date=changefile_date, + changefile_end_date=changefile_date, changefiles=[ Changefile( "changed_dois_with_versions_2023-04-25T080001.jsonl.gz", - changfile_date, + changefile_date, ) ], ) @@ -355,6 +318,10 @@ def test_telescope(self): ti = env.run_task(workflow.create_datasets.__name__) self.assertEqual(State.SUCCESS, ti.state) + # Create snapshot: no table created on this run + ti = env.run_task(workflow.bq_create_main_table_snapshot.__name__) + self.assertEqual(State.SUCCESS, ti.state) + # Download and process snapshot server = HttpServer(directory=test_fixtures_folder(self.dag_id), port=find_free_port()) with server.create(): @@ -369,490 +336,286 @@ def test_telescope(self): ti = env.run_task(workflow.upload_downloaded_snapshot.__name__) self.assertEqual(State.SUCCESS, ti.state) + self.assert_blob_integrity( + env.download_bucket, + gcs_blob_name_from_path(release.snapshot_download_file_path), + release.snapshot_download_file_path, + ) ti = env.run_task(workflow.extract_snapshot.__name__) self.assertEqual(State.SUCCESS, ti.state) + self.assertTrue(os.path.isfile(release.snapshot_extract_file_path)) ti = env.run_task(workflow.transform_snapshot.__name__) self.assertEqual(State.SUCCESS, ti.state) + self.assertTrue(os.path.isfile(release.main_table_file_path)) ti = env.run_task("split_main_table_file") self.assertEqual(State.SUCCESS, ti.state) + file_paths = list_files(release.snapshot_release.transform_folder, release.main_table_files_regex) + self.assertTrue(len(file_paths) >= 1) + for file_path in file_paths: + self.assertTrue(os.path.isfile(file_path)) ti = env.run_task(workflow.upload_main_table_files.__name__) self.assertEqual(State.SUCCESS, ti.state) + for file_path in file_paths: + self.assert_blob_integrity(env.transform_bucket, gcs_blob_name_from_path(file_path), file_path) ti = env.run_task(workflow.bq_load_main_table.__name__) self.assertEqual(State.SUCCESS, ti.state) + self.assert_table_integrity(workflow.bq_main_table_id, expected_rows=10) + expected_content = load_json( + test_fixtures_folder(self.dag_id, "expected", "run1_bq_load_main_table.json") + ) + self.assert_table_content(workflow.bq_main_table_id, expected_content, "doi") # Download and process changefiles - # TODO: mock download - ti = env.run_task(workflow.download_change_files.__name__) + server = HttpServer(directory=test_fixtures_folder(self.dag_id), port=find_free_port()) + with server.create(): + with patch.object( + academic_observatory_workflows.workflows.unpaywall_telescope, + "CHANGEFILES_DOWNLOAD_URL", + f"http://localhost:{server.port}", + ): + ti = env.run_task(workflow.download_change_files.__name__) self.assertEqual(State.SUCCESS, ti.state) + for changefile in release.changefiles: + self.assertTrue(os.path.isfile(changefile.download_file_path)) ti = env.run_task(workflow.upload_downloaded_change_files.__name__) self.assertEqual(State.SUCCESS, ti.state) + for changefile in release.changefiles: + self.assert_blob_integrity( + env.download_bucket, + gcs_blob_name_from_path(changefile.download_file_path), + changefile.download_file_path, + ) ti = env.run_task(workflow.extract_change_files.__name__) self.assertEqual(State.SUCCESS, ti.state) + for changefile in release.changefiles: + self.assertTrue(os.path.isfile(changefile.extract_file_path)) ti = env.run_task(workflow.transform_change_files.__name__) self.assertEqual(State.SUCCESS, ti.state) + # The transformed files are deleted + for changefile in release.changefiles: + self.assertFalse(os.path.isfile(changefile.transform_file_path)) + # Upsert file should exist + self.assertTrue(os.path.isfile(release.upsert_table_file_path)) ti = env.run_task(workflow.upload_upsert_files.__name__) self.assertEqual(State.SUCCESS, ti.state) + self.assert_blob_integrity( + env.transform_bucket, + gcs_blob_name_from_path(release.upsert_table_file_path), + release.upsert_table_file_path, + ) ti = env.run_task(workflow.bq_load_upsert_table.__name__) self.assertEqual(State.SUCCESS, ti.state) + self.assert_table_integrity(workflow.bq_upsert_table_id, expected_rows=2) ti = env.run_task(workflow.bq_upsert_records.__name__) self.assertEqual(State.SUCCESS, ti.state) - - ti = env.run_task(workflow.bq_create_main_table_snapshot.__name__) - self.assertEqual(State.SUCCESS, ti.state) + self.assert_table_integrity(workflow.bq_main_table_id, expected_rows=10) + expected_content = load_json( + test_fixtures_folder(self.dag_id, "expected", "run1_bq_upsert_records.json") + ) + self.assert_table_content(workflow.bq_main_table_id, expected_content, "doi") # Final tasks + dataset_releases = get_dataset_releases(dag_id=self.dag_id, dataset_id=workflow.api_dataset_id) + self.assertEqual(len(dataset_releases), 0) ti = env.run_task(workflow.add_new_dataset_releases.__name__) self.assertEqual(State.SUCCESS, ti.state) + dataset_releases = get_dataset_releases(dag_id=self.dag_id, dataset_id=workflow.api_dataset_id) + self.assertEqual(len(dataset_releases), 1) + # Test that all workflow data deleted ti = env.run_task(workflow.cleanup.__name__) self.assertEqual(State.SUCCESS, ti.state) + self.assert_cleanup(release.workflow_folder) ti = env.run_task("dag_run_complete") self.assertEqual(State.SUCCESS, ti.state) # Second run: no new changefiles - execution_date = pendulum.datetime(2023, 4, 26) - with env.create_dag_run(dag, execution_date): - # Check that all tasks after fetch_releases are skipped - pass - - # Third run: applying changefiles - # In this example there are multiple changefiles - execution_date = pendulum.datetime(2023, 4, 28) - with env.create_dag_run(dag, execution_date): - pass - - a = 1 - - # release = telescope.make_release(**{"ti": ti}) - # - # # Download data - # ti = env.run_task(telescope.download.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # - # # Upload downloaded data - # ti = env.run_task(telescope.upload_downloaded.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # for file in release.download_files: - # self.assert_blob_integrity(env.download_bucket, gcs_blob_name_from_path(file), file) - # - # # Extract data - # ti = env.run_task(telescope.extract.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # - # # Transform data - # ti = env.run_task(telescope.transform.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # self.assertEqual(len(release.transform_files), 1) - # - # # Upload transformed data - # ti = env.run_task(telescope.upload_transformed.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # for file in release.transform_files: - # self.assert_blob_integrity(env.transform_bucket, gcs_blob_name_from_path(file), file) - # - # # Merge updates - # ti = env.run_task(telescope.merge_update_files.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # self.assertEqual(len(release.transform_files), 1) - # - # # Load bq table partitions - # ti = env.run_task(telescope.bq_load_partition.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # - # # Delete changed data from main table - # ti = env.run_task(telescope.bq_delete_old.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # - # # Add new changes - # ti = env.run_task(telescope.bq_append_new.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # main_table_id, partition_table_id = release.dag_id, f"{release.dag_id}_partitions" - # table_id = f"{self.project_id}.{telescope.dataset_id}.{main_table_id}" - # expected_rows = 100 - # self.assert_table_integrity(table_id, expected_rows) - # - # # Cleanup files - # download_folder, extract_folder, transform_folder = ( - # release.download_folder, - # release.extract_folder, - # release.transform_folder, - # ) - # ti = env.run_task(telescope.cleanup.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # self.assert_cleanup(download_folder, extract_folder, transform_folder) - # - # # Load update_dataset_release_task - # dataset_releases = get_dataset_releases(dataset_id=1) - # self.assertEqual(len(dataset_releases), 0) - # ti = env.run_task(telescope.add_new_dataset_releases.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # dataset_releases = get_dataset_releases(dataset_id=1) - # self.assertEqual(len(dataset_releases), 1) - # self.assertEqual(dataset_releases[0].start_date, release.start_date) - # self.assertEqual(dataset_releases[0].end_date, release.end_date) - # - # # Second run (skips) Use dailies - # with env.create_dag_run(dag, second_execution_date): - # # Check dependencies are met - # ti = env.run_task(telescope.check_dependencies.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # - # # Check releases - # ti = env.run_task(telescope.check_releases.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # - # # Download data - # ti = env.run_task(telescope.download.__name__) - # self.assertEqual(ti.state, State.SKIPPED) - # - # # Upload downloaded data - # ti = env.run_task(telescope.upload_downloaded.__name__) - # self.assertEqual(ti.state, State.SKIPPED) - # - # # Extract data - # ti = env.run_task(telescope.extract.__name__) - # self.assertEqual(ti.state, State.SKIPPED) - # - # # Transform data - # ti = env.run_task(telescope.transform.__name__) - # self.assertEqual(ti.state, State.SKIPPED) - # self.assertEqual(len(release.transform_files), 0) - # - # # Upload transformed data - # ti = env.run_task(telescope.upload_transformed.__name__) - # self.assertEqual(ti.state, State.SKIPPED) - # - # # Merge updates - # ti = env.run_task(telescope.merge_update_files.__name__) - # self.assertEqual(ti.state, State.SKIPPED) - # - # # Load bq table partitions - # ti = env.run_task(telescope.bq_load_partition.__name__) - # self.assertEqual(ti.state, State.SKIPPED) - # - # # Delete changed data from main table - # ti = env.run_task(telescope.bq_delete_old.__name__) - # self.assertEqual(ti.state, State.SKIPPED) - # - # # Add new changes - # ti = env.run_task(telescope.bq_append_new.__name__) - # self.assertEqual(ti.state, State.SKIPPED) - # - # # Cleanup files - # ti = env.run_task(telescope.cleanup.__name__) - # self.assertEqual(ti.state, State.SKIPPED) - # - # # Load add_new_dataset_releases - # ti = env.run_task(telescope.add_new_dataset_releases.__name__) - # self.assertEqual(ti.state, State.SKIPPED) - # - # # Third run (downloads) - # with env.create_dag_run(dag, third_execution_date): - # # Check dependencies are met - # ti = env.run_task(telescope.check_dependencies.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # - # # Check releases - # ti = env.run_task(telescope.check_releases.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # - # release = telescope.make_release(**{"ti": ti}) - # - # # Download data - # ti = env.run_task(telescope.download.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # - # # Upload downloaded data - # ti = env.run_task(telescope.upload_downloaded.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # for file in release.download_files: - # self.assert_blob_integrity(env.download_bucket, gcs_blob_name_from_path(file), file) - # - # # Extract data - # ti = env.run_task(telescope.extract.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # - # # Transform data - # ti = env.run_task(telescope.transform.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # self.assertEqual(len(release.transform_files), 3) - # - # # Upload transformed data - # ti = env.run_task(telescope.upload_transformed.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # for file in release.transform_files: - # self.assert_blob_integrity(env.transform_bucket, gcs_blob_name_from_path(file), file) - # - # # Merge updates - # ti = env.run_task(telescope.merge_update_files.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # self.assertEqual(len(release.transform_files), 1) - # - # # Load bq table partitions - # ti = env.run_task(telescope.bq_load_partition.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # main_table_id, partition_table_id = release.dag_id, f"{release.dag_id}_partitions" - # table_id = create_date_table_id( - # partition_table_id, release.end_date, bigquery.TimePartitioningType.DAY - # ) - # table_id = f"{self.project_id}.{telescope.dataset_id}.{table_id}" - # expected_rows = 2 - # self.assert_table_integrity(table_id, expected_rows) - # - # # Delete changed data from main table - # ti = env.run_task(telescope.bq_delete_old.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # table_id = f"{self.project_id}.{telescope.dataset_id}.{main_table_id}" - # expected_rows = 99 - # self.assert_table_integrity(table_id, expected_rows) - # - # # Add new changes - # ti = env.run_task(telescope.bq_append_new.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # table_id = f"{self.project_id}.{telescope.dataset_id}.{main_table_id}" - # expected_rows = 101 - # self.assert_table_integrity(table_id, expected_rows) - # - # # Cleanup files - # download_folder, extract_folder, transform_folder = ( - # release.download_folder, - # release.extract_folder, - # release.transform_folder, - # ) - # ti = env.run_task(telescope.cleanup.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # self.assert_cleanup(download_folder, extract_folder, transform_folder) - # - # # Load update_dataset_release_task - # dataset_releases = get_dataset_releases(dataset_id=1) - # self.assertEqual(len(dataset_releases), 1) - # ti = env.run_task(telescope.add_new_dataset_releases.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # dataset_releases = get_dataset_releases(dataset_id=1) - # latest_release = get_latest_dataset_release(dataset_releases) - # self.assertEqual(len(dataset_releases), 2) - # self.assertEqual(latest_release.start_date, release.start_date) - # self.assertEqual(latest_release.end_date, release.end_date) - # - # # Fourth run. No new data - # with env.create_dag_run(dag, fourth_execution_date): - # # Check dependencies are met - # ti = env.run_task(telescope.check_dependencies.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # - # # Check releases - # with patch( - # "academic_observatory_workflows.workflows.unpaywall_telescope.UnpaywallRelease.get_diff_releases" - # ) as m_diff_releases: - # m_diff_releases.return_value = [] - # ti = env.run_task(telescope.check_releases.__name__) - # self.assertEqual(State.SUCCESS, ti.state) - # - # # Download data - # ti = env.run_task(telescope.download.__name__) - # self.assertEqual(ti.state, State.SKIPPED) - # - # # Clean up template - # self.remove_changefiles() + data_interval_start = pendulum.datetime(2023, 4, 26) + with env.create_dag_run(dag, data_interval_start): + # Fetch releases and check that we have received the expected snapshot date and changefiles + task_ids = ["wait_for_prev_dag_run", "check_dependencies", "fetch_releases"] + with patch.multiple( + "academic_observatory_workflows.workflows.unpaywall_telescope", + get_unpaywall_changefiles=lambda api_key: [], + get_snapshot_file_name=lambda api_key: "filename", + ): + for task_id in task_ids: + ti = env.run_task(task_id) + self.assertEqual(State.SUCCESS, ti.state) + + # Check that all subsequent tasks are skipped + task_ids = [ + "create_datasets", + "bq_create_main_table_snapshot", + "download_snapshot", + "upload_downloaded_snapshot", + "extract_snapshot", + "transform_snapshot", + "split_main_table_file", + "upload_main_table_files", + "bq_load_main_table", + "download_change_files", + "upload_downloaded_change_files", + "extract_change_files", + "transform_change_files", + "upload_upsert_files", + "bq_load_upsert_table", + "bq_upsert_records", + "add_new_dataset_releases", + "cleanup", + ] + for task_id in task_ids: + ti = env.run_task(task_id) + self.assertEqual(State.SKIPPED, ti.state) + # Check that only 1 dataset release exists + dataset_releases = get_dataset_releases(dag_id=self.dag_id, dataset_id=workflow.api_dataset_id) + self.assertEqual(len(dataset_releases), 1) -# -# class TestUnpaywallRelease(unittest.TestCase): -# def __init__(self, *args, **kwargs): -# super().__init__(*args, **kwargs) -# -# self.fixture_dir = test_fixtures_folder("unpaywall") -# self.snapshot_file = "unpaywall_2021-07-02T151134.jsonl.gz" -# self.snapshot_path = os.path.join(self.fixture_dir, self.snapshot_file) -# self.snapshot_hash = "0f1ac32355c4582d82ae4bc76db17c26" # md5 -# -# -# @patch("academic_observatory_workflows.workflows.unpaywall_telescope.UnpaywallRelease.get_diff_releases") -# @patch("academic_observatory_workflows.workflows.unpaywall_telescope.get_airflow_connection_password") -# @patch("academic_observatory_workflows.workflows.unpaywall_telescope.download_file") -# @patch("academic_observatory_workflows.workflows.unpaywall_telescope.get_observatory_http_header") -# @patch("airflow.models.variable.Variable.get") -# def test_download_data_feed(self, m_get, m_header, m_download, m_pass, m_diff_releases): -# m_get.return_value = "data" -# m_pass.return_value = "testpass" -# m_header.return_value = {"User-Agent": "custom"} -# m_diff_releases.return_value = [ -# { -# "url": "http://url1", -# "filename": "changed_dois_with_versions_2021-07-02T080001.jsonl.gz", -# } -# ] -# -# release = UnpaywallRelease( -# dag_id="dag", -# start_date=pendulum.datetime(2021, 7, 4), -# end_date=pendulum.datetime(2021, 7, 4), -# first_release=False, -# workflow_id=1, -# ) -# -# release.download() -# _, call_args = m_download.call_args -# self.assertEqual(call_args["url"], "http://url1") -# self.assertEqual( -# call_args["filename"], -# "data/telescopes/download/dag/2021_07_04-2021_07_04/changed_dois_with_versions_2021-07-02T080001.jsonl.gz", -# ) -# -# @patch("academic_observatory_workflows.workflows.unpaywall_telescope.get_observatory_http_header") -# @patch("academic_observatory_workflows.workflows.unpaywall_telescope.get_airflow_connection_password") -# @patch("academic_observatory_workflows.workflows.unpaywall_telescope.download_file") -# @patch("airflow.models.variable.Variable.get") -# def test_download_snapshot(self, m_get, m_download, m_pass, m_header): -# m_get.return_value = "data" -# m_pass.return_value = "testpass" -# m_header.return_value = {"User-Agent": "custom"} -# -# with CliRunner().isolated_filesystem(): -# release = UnpaywallRelease( -# dag_id="dag", -# start_date=pendulum.datetime(2021, 7, 2), -# end_date=pendulum.datetime(2021, 7, 3), -# first_release=True, -# workflow_id=1, -# ) -# -# src = self.snapshot_path -# dst = os.path.join(release.download_folder, self.snapshot_file) -# shutil.copyfile(src, dst) -# -# release.download() -# -# @patch("academic_observatory_workflows.workflows.unpaywall_telescope.get_datasets") -# @patch("academic_observatory_workflows.workflows.unpaywall_telescope.UnpaywallRelease.get_unpaywall_daily_feeds") -# @patch("academic_observatory_workflows.workflows.unpaywall_telescope.get_dataset_releases") -# def test_get_diff_releases(self, m_ds_releases, m_dailyfeeds, m_ds): -# # First release -# m_ds_releases.return_value = [ -# DatasetRelease( -# dataset=Dataset(id=1), -# start_date=pendulum.datetime(2021, 1, 20), -# end_date=pendulum.datetime(2021, 1, 20), -# ) -# ] -# m_ds.return_value = [ -# Dataset( -# name="dataset", -# service="bigquery", -# address="project.dataset.table", -# dataset_type=DatasetType(id=1), -# ) -# ] -# m_dailyfeeds.return_value = [ -# {"snapshot_date": pendulum.datetime(2021, 1, 19), "url": "url1", "filename": "file1"}, -# {"snapshot_date": pendulum.datetime(2021, 1, 20), "url": "url2", "filename": "file2"}, -# {"snapshot_date": pendulum.datetime(2021, 1, 21), "url": "url3", "filename": "file3"}, -# {"snapshot_date": pendulum.datetime(2021, 1, 22), "url": "url4", "filename": "file4"}, -# ] -# -# end_date = pendulum.datetime(2021, 1, 21) -# result = UnpaywallRelease.get_diff_releases(end_date=end_date, workflow_id=1) -# -# self.assertEqual(len(result), 3) -# self.assertEqual(result[0]["snapshot_date"], pendulum.datetime(2021, 1, 19)) -# self.assertEqual(result[1]["snapshot_date"], pendulum.datetime(2021, 1, 20)) -# self.assertEqual(result[2]["snapshot_date"], pendulum.datetime(2021, 1, 21)) -# -# # Subsequent release -# m_ds_releases.return_value = [ -# DatasetRelease( -# dataset=Dataset(id=1), -# start_date=pendulum.datetime(2021, 1, 17), -# end_date=pendulum.datetime(2021, 1, 18), -# ), -# DatasetRelease( -# dataset=Dataset(id=1), -# start_date=pendulum.datetime(2021, 1, 18), -# end_date=pendulum.datetime(2021, 1, 19), -# ), -# ] -# end_date = pendulum.datetime(2021, 1, 21) -# result = UnpaywallRelease.get_diff_releases(end_date=end_date, workflow_id=1) -# -# self.assertEqual(len(result), 2) -# self.assertEqual(result[0]["snapshot_date"], pendulum.datetime(2021, 1, 20)) -# self.assertEqual(result[1]["snapshot_date"], pendulum.datetime(2021, 1, 21)) -# -# @patch("airflow.models.variable.Variable.get") -# def test_extract(self, m_get): -# m_get.return_value = "data" -# with CliRunner().isolated_filesystem(): -# release = UnpaywallRelease( -# dag_id="dag", -# start_date=pendulum.datetime(2021, 7, 4), -# end_date=pendulum.datetime(2021, 7, 4), -# first_release=True, -# workflow_id=1, -# ) -# src = self.snapshot_path -# dst = os.path.join(release.download_folder, self.snapshot_file) -# shutil.copyfile(src, dst) -# self.assertEqual(len(release.download_files), 1) -# release.extract() -# self.assertEqual(len(release.extract_files), 1) -# -# @patch("airflow.models.variable.Variable.get") -# def test_transform(self, m_get): -# m_get.return_value = "data" -# with CliRunner().isolated_filesystem(): -# release = UnpaywallRelease( -# dag_id="dag", -# start_date=pendulum.datetime(2021, 7, 4), -# end_date=pendulum.datetime(2021, 7, 4), -# first_release=True, -# workflow_id=1, -# ) -# src = self.snapshot_path -# dst = os.path.join(release.download_folder, self.snapshot_file) -# shutil.copyfile(src, dst) -# release.extract() -# release.transform() -# self.assertEqual(len(release.transform_files), 1) -# -# json_transformed_hash = "62cbb5af5a78d2e0769a28d976971cba" -# json_transformed = os.path.join(release.transform_folder, self.snapshot_file[:-3]) -# self.assertTrue(validate_file_hash(file_path=json_transformed, expected_hash=json_transformed_hash)) -# -# @patch("observatory.platform.utils.release_utils.make_observatory_api") -# @patch("academic_observatory_workflows.workflows.unpaywall_telescope.get_dataset_releases") -# def test_is_second_run(self, m_get_ds_release, m_makeapi): -# m_makeapi.return_value = self.api -# -# with self.env.create(): -# self.setup_api() -# m_get_ds_release.return_value = [ -# DatasetRelease( -# dataset=Dataset(id=1), -# start_date=pendulum.datetime(2020, 1, 1), -# end_date=pendulum.datetime(2020, 1, 1), -# ) -# ] -# -# self.assertTrue(UnpaywallRelease.is_second_run(1)) -# -# @patch("academic_observatory_workflows.workflows.unpaywall_telescope.get_http_response_json") -# @patch("academic_observatory_workflows.workflows.unpaywall_telescope.UnpaywallRelease.changefiles_url") -# def test_get_unpaywall_daily_feeds(self, m_url, m_response): -# m_url.return_value = "url" -# m_response.return_value = {"list": [{"filename": "unpaywall_2021-07-02T151134.jsonl.gz", "url": "url1"}]} -# -# feeds = UnpaywallRelease.get_unpaywall_daily_feeds() -# self.assertEqual(len(feeds), 1) -# self.assertEqual(feeds[0]["snapshot_date"], pendulum.datetime(2021, 7, 2, 15, 11, 34)) + # Third run: waiting a couple of days and applying multiple changefiles + data_interval_start = pendulum.datetime(2023, 4, 27) + changefile_start_date = pendulum.datetime(2023, 4, 26, 8, 0, 1) + changefile_end_date = pendulum.datetime(2023, 4, 27, 8, 0, 1) + with env.create_dag_run(dag, data_interval_start) as dag_run: + # Mocked and expected data + release = UnpaywallRelease( + dag_id=self.dag_id, + run_id=dag_run.run_id, + is_first_run=False, + snapshot_date=snapshot_date, + changefile_start_date=changefile_start_date, + changefile_end_date=changefile_end_date, + changefiles=[ + Changefile( + "changed_dois_with_versions_2023-04-27T080001.jsonl.gz", + changefile_end_date, + ), + Changefile( + "changed_dois_with_versions_2023-04-26T080001.jsonl.gz", + changefile_start_date, + ), + ], + ) + + # Fetch releases and check that we have received the expected snapshot date and changefiles + task_ids = [ + "wait_for_prev_dag_run", + "check_dependencies", + "fetch_releases", + "create_datasets", + "bq_create_main_table_snapshot", + ] + with patch.multiple( + "academic_observatory_workflows.workflows.unpaywall_telescope", + get_unpaywall_changefiles=lambda api_key: release.changefiles, + get_snapshot_file_name=lambda api_key: make_snapshot_filename(snapshot_date), + ): + for task_id in task_ids: + ti = env.run_task(task_id) + self.assertEqual(State.SUCCESS, ti.state) + + # Check that snapshot created + dst_table_id = bq_sharded_table_id( + workflow.cloud_workspace.output_project_id, + workflow.bq_dataset_id, + f"{workflow.bq_table_name}_snapshot", + release.changefile_release.changefile_end_date, + ) + self.assert_table_integrity(dst_table_id, expected_rows=10) + + # Run snapshot tasks, these should all be successful, but actually just skip internally + task_ids = [ + "download_snapshot", + "upload_downloaded_snapshot", + "extract_snapshot", + "transform_snapshot", + "split_main_table_file", + "upload_main_table_files", + "bq_load_main_table", + ] + for task_id in task_ids: + ti = env.run_task(task_id) + self.assertEqual(State.SUCCESS, ti.state) + + # Run changefile tasks + server = HttpServer(directory=test_fixtures_folder(self.dag_id), port=find_free_port()) + with server.create(): + with patch.object( + academic_observatory_workflows.workflows.unpaywall_telescope, + "CHANGEFILES_DOWNLOAD_URL", + f"http://localhost:{server.port}", + ): + ti = env.run_task(workflow.download_change_files.__name__) + self.assertEqual(State.SUCCESS, ti.state) + for changefile in release.changefiles: + self.assertTrue(os.path.isfile(changefile.download_file_path)) + + ti = env.run_task(workflow.upload_downloaded_change_files.__name__) + self.assertEqual(State.SUCCESS, ti.state) + for changefile in release.changefiles: + self.assert_blob_integrity( + env.download_bucket, + gcs_blob_name_from_path(changefile.download_file_path), + changefile.download_file_path, + ) + + ti = env.run_task(workflow.extract_change_files.__name__) + self.assertEqual(State.SUCCESS, ti.state) + for changefile in release.changefiles: + self.assertTrue(os.path.isfile(changefile.extract_file_path)) + + ti = env.run_task(workflow.transform_change_files.__name__) + self.assertEqual(State.SUCCESS, ti.state) + # The transformed files are deleted + for changefile in release.changefiles: + self.assertFalse(os.path.isfile(changefile.transform_file_path)) + # Upsert file should exist + self.assertTrue(os.path.isfile(release.upsert_table_file_path)) + + ti = env.run_task(workflow.upload_upsert_files.__name__) + self.assertEqual(State.SUCCESS, ti.state) + self.assert_blob_integrity( + env.transform_bucket, + gcs_blob_name_from_path(release.upsert_table_file_path), + release.upsert_table_file_path, + ) + + ti = env.run_task(workflow.bq_load_upsert_table.__name__) + self.assertEqual(State.SUCCESS, ti.state) + self.assert_table_integrity(workflow.bq_upsert_table_id, expected_rows=4) + + ti = env.run_task(workflow.bq_upsert_records.__name__) + self.assertEqual(State.SUCCESS, ti.state) + self.assert_table_integrity(workflow.bq_main_table_id, expected_rows=12) + expected_content = load_json( + test_fixtures_folder(self.dag_id, "expected", "run3_bq_upsert_records.json") + ) + self.assert_table_content(workflow.bq_main_table_id, expected_content, "doi") + + # Final tasks + dataset_releases = get_dataset_releases(dag_id=self.dag_id, dataset_id=workflow.api_dataset_id) + self.assertEqual(len(dataset_releases), 1) + ti = env.run_task(workflow.add_new_dataset_releases.__name__) + self.assertEqual(State.SUCCESS, ti.state) + dataset_releases = get_dataset_releases(dag_id=self.dag_id, dataset_id=workflow.api_dataset_id) + self.assertEqual(len(dataset_releases), 2) + + # Test that all workflow data deleted + ti = env.run_task(workflow.cleanup.__name__) + self.assertEqual(State.SUCCESS, ti.state) + self.assert_cleanup(release.workflow_folder) + + ti = env.run_task("dag_run_complete") + self.assertEqual(State.SUCCESS, ti.state) diff --git a/academic_observatory_workflows/workflows/unpaywall_telescope.py b/academic_observatory_workflows/workflows/unpaywall_telescope.py index 448d6489d..19319c78f 100644 --- a/academic_observatory_workflows/workflows/unpaywall_telescope.py +++ b/academic_observatory_workflows/workflows/unpaywall_telescope.py @@ -146,6 +146,8 @@ def __init__( changefile_end_date=changefile_end_date, ) self.changefiles = changefiles + for changefile in changefiles: + changefile.changefile_release = self.changefile_release # Paths used during processing self.snapshot_download_file_path = os.path.join( @@ -172,7 +174,7 @@ def __init__( dataset_description: str = "Our Research datasets: http://ourresearch.org/", table_description: str = "Unpaywall Data Feed: https://unpaywall.org/products/data-feed", merge_primary_key: str = "doi", - snapshot_expiry_days: int = 14, + snapshot_expiry_days: int = 7, http_header: str = None, unpaywall_conn_id: str = "unpaywall", observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, @@ -226,6 +228,7 @@ def __init__( self.add_setup_task(self.check_dependencies) self.add_setup_task(self.fetch_releases) self.add_task(self.create_datasets) + self.add_task(self.bq_create_main_table_snapshot) # Download and process snapshot self.add_task(self.download_snapshot) @@ -236,7 +239,7 @@ def __init__( WorkflowBashOperator( workflow=self, task_id="split_main_table_file", - bash_command="cd {{ release.snapshot_release.transform_folder }} && split -C 4G --numeric-suffixes=1 --suffix-length=12 --additional-suffix=.jsonl main_table.jsonl main_table", + bash_command="{% if release.is_first_run %}cd {{ release.snapshot_release.transform_folder }} && split -C 4G --numeric-suffixes=1 --suffix-length=12 --additional-suffix=.jsonl main_table.jsonl main_table{% else %}echo 'Skipping split command because release.is_first_run is false'{% endif %}", ) ) self.add_task(self.upload_main_table_files) @@ -250,7 +253,6 @@ def __init__( self.add_task(self.upload_upsert_files) self.add_task(self.bq_load_upsert_table) self.add_task(self.bq_upsert_records) - self.add_task(self.bq_create_main_table_snapshot) # Add release info to API and cleanup self.add_task(self.add_new_dataset_releases) @@ -322,6 +324,9 @@ def fetch_releases(self, **kwargs) -> bool: if prev_changefile_date < changefile.changefile_date: changefiles.append(changefile) + # Sort from oldest to newest + changefiles.sort(key=lambda c: c.changefile_date, reverse=False) + if len(changefiles) == 0: logging.info(f"fetch_releases: no changefiles found, skipping") return False @@ -355,9 +360,9 @@ def make_release(self, **kwargs) -> UnpaywallRelease: snapshot_date, changefiles, is_first_run = parse_release_msg(msg) run_id = kwargs["run_id"] - # The first changefile is the newest and the last one is the oldest - changefile_start_date = changefiles[-1].changefile_date - changefile_end_date = changefiles[0].changefile_date + # The first changefile is the oldest and the last one is the newest + changefile_start_date = changefiles[0].changefile_date + changefile_end_date = changefiles[-1].changefile_date release = UnpaywallRelease( dag_id=self.dag_id, @@ -385,6 +390,24 @@ def create_datasets(self, release: UnpaywallRelease, **kwargs) -> None: description=self.dataset_description, ) + def bq_create_main_table_snapshot(self, release: UnpaywallRelease, **kwargs) -> None: + """Create a snapshot of the main table. The purpose of this table is to be able to rollback the table + if something goes wrong. The snapshot expires after self.snapshot_expiry_days.""" + + if release.is_first_run: + logging.info(f"bq_create_main_table_snapshots: skipping as snapshots are not created on the first run") + return + + dst_table_id = bq_sharded_table_id( + self.cloud_workspace.output_project_id, + self.bq_dataset_id, + f"{self.bq_table_name}_snapshot", + release.changefile_release.changefile_end_date, + ) + expiry_date = pendulum.now().add(days=self.snapshot_expiry_days) + success = bq_snapshot(src_table_id=self.bq_main_table_id, dst_table_id=dst_table_id, expiry_date=expiry_date) + set_task_state(success, self.bq_create_main_table_snapshot.__name__, release) + ############################################### # Download and process snapshot on first run ############################################### @@ -495,7 +518,7 @@ def bq_load_main_table(self, release: UnpaywallRelease, **kwargs) -> None: source_format=SourceFormat.NEWLINE_DELIMITED_JSON, table_description=self.table_description, write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, - ignore_unknown_values=True, + ignore_unknown_values=False, ) set_task_state(success, self.bq_load_upsert_table.__name__, release) @@ -558,9 +581,9 @@ def transform_change_files(self, release: UnpaywallRelease, **kwargs): logging.info( "transform_change_files: Merge change files, make sure that we process them from the oldest changefile to the newest" ) + # Make sure changefiles are sorted from oldest to newest, just in case they were not sorted for some reason changefiles = sorted(release.changefiles, key=lambda c: c.changefile_date, reverse=False) - transform_files = [changefile.transform_file_path for changefile in release.changefiles] - changefiles.sort(key=lambda c: c.changefile_date, reverse=True) + transform_files = [changefile.transform_file_path for changefile in changefiles] merge_update_files( primary_key=self.merge_primary_key, input_files=transform_files, output_file=release.upsert_table_file_path ) @@ -602,20 +625,6 @@ def bq_upsert_records(self, release: UnpaywallRelease, **kwargs) -> None: primary_key=self.merge_primary_key, ) - def bq_create_main_table_snapshot(self, release: UnpaywallRelease, **kwargs) -> None: - """Create a snapshot of the main table. The purpose of this table is to be able to rollback the table - if something goes wrong. The snapshot expires after self.snapshot_expiry_days.""" - - dst_table_id = bq_sharded_table_id( - self.cloud_workspace.output_project_id, - self.bq_dataset_id, - f"{self.bq_table_name}_snapshot", - release.changefile_release.changefile_end_date, - ) - expiry_date = pendulum.now().add(days=self.snapshot_expiry_days) - success = bq_snapshot(src_table_id=self.bq_main_table_id, dst_table_id=dst_table_id, expiry_date=expiry_date) - set_task_state(success, self.bq_create_main_table_snapshot.__name__, release) - def add_new_dataset_releases(self, release: UnpaywallRelease, **kwargs) -> None: """Adds release information to API.""" @@ -682,8 +691,8 @@ def get_unpaywall_changefiles(api_key: str) -> List[Changefile]: filename = changefile["filename"] changefiles.append(Changefile(filename, unpaywall_filename_to_datetime(filename))) - # Make sure sorted - changefiles.sort(key=lambda c: c.changefile_date, reverse=True) + # Make sure sorted from oldest to newest + changefiles.sort(key=lambda c: c.changefile_date, reverse=False) return changefiles