diff --git a/academic_observatory_workflows/fixtures/unpaywall/get_snapshot_file_name_failure.yaml b/academic_observatory_workflows/fixtures/unpaywall/get_snapshot_file_name_failure.yaml new file mode 100644 index 000000000..a2fe3d5a3 --- /dev/null +++ b/academic_observatory_workflows/fixtures/unpaywall/get_snapshot_file_name_failure.yaml @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:401e5f686d97feb2c9663392d81238acc4b6879092a03a11c2c4462e60478a60 +size 842 diff --git a/academic_observatory_workflows/fixtures/unpaywall/get_snapshot_file_name_success.yaml b/academic_observatory_workflows/fixtures/unpaywall/get_snapshot_file_name_success.yaml new file mode 100644 index 000000000..622778171 --- /dev/null +++ b/academic_observatory_workflows/fixtures/unpaywall/get_snapshot_file_name_success.yaml @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:8843c6c9b0899b559590a9fd9fc4267c00b523477c6be2ada7c7adf82a5a5576 +size 936 diff --git a/academic_observatory_workflows/fixtures/unpaywall/unpaywall_2021-07-02T151134.jsonl.gz b/academic_observatory_workflows/fixtures/unpaywall/unpaywall_snapshot_2023-04-25T083002.jsonl.gz similarity index 100% rename from academic_observatory_workflows/fixtures/unpaywall/unpaywall_2021-07-02T151134.jsonl.gz rename to academic_observatory_workflows/fixtures/unpaywall/unpaywall_snapshot_2023-04-25T083002.jsonl.gz diff --git a/academic_observatory_workflows/workflows/tests/test_unpaywall_telescope.py b/academic_observatory_workflows/workflows/tests/test_unpaywall_telescope.py index 90a57180f..6dc9f059d 100644 --- a/academic_observatory_workflows/workflows/tests/test_unpaywall_telescope.py +++ b/academic_observatory_workflows/workflows/tests/test_unpaywall_telescope.py @@ -1,52 +1,673 @@ -# # Copyright 2020 Curtin University -# # -# # Licensed under the Apache License, Version 2.0 (the "License"); -# # you may not use this file except in compliance with the License. -# # You may obtain a copy of the License at -# # -# # http://www.apache.org/licenses/LICENSE-2.0 -# # -# # Unless required by applicable law or agreed to in writing, software -# # distributed under the License is distributed on an "AS IS" BASIS, -# # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# # See the License for the specific language governing permissions and -# # limitations under the License. -# -# # Author: Tuan Chien -# -# import os -# import shutil -# import unittest -# from unittest.mock import patch -# -# import pendulum -# from airflow.models import Connection -# from airflow.utils.state import State -# from click.testing import CliRunner -# from google.cloud import bigquery -# -# from academic_observatory_workflows.config import test_fixtures_folder -# from academic_observatory_workflows.workflows.unpaywall_telescope import ( -# UnpaywallRelease, -# UnpaywallTelescope, -# ) -# from observatory.api.client import ApiClient, Configuration -# from observatory.api.client.api.observatory_api import ObservatoryApi # noqa: E501 -# from observatory.api.client.model.dataset_release import DatasetRelease -# from observatory.api.testing import ObservatoryApiEnvironment -# from observatory.platform.airflow import AirflowConns -# from observatory.platform.api import get_dataset_releases, get_latest_dataset_release -# from observatory.platform.files import validate_file_hash -# from observatory.platform.gcs import gcs_blob_name_from_path -# from observatory.platform.observatory_environment import ( -# HttpServer, -# ObservatoryEnvironment, -# ObservatoryTestCase, -# module_file_path, -# find_free_port, -# ) -# from observatory.platform.utils.jinja2_utils import render_template -# +# Copyright 2020 Curtin University +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Author: Tuan Chien, James Diprose + +import os +from typing import List +from unittest.mock import patch + +import pendulum +import vcr +from airflow import AirflowException +from airflow.models import Connection +from airflow.utils.state import State + +import academic_observatory_workflows +from academic_observatory_workflows.config import test_fixtures_folder +from academic_observatory_workflows.workflows.unpaywall_telescope import ( + UnpaywallTelescope, + snapshot_url, + get_snapshot_file_name, + changefiles_url, + changefile_download_url, + unpaywall_filename_to_datetime, + get_unpaywall_changefiles, + Changefile, + parse_release_msg, + UnpaywallRelease, +) +from observatory.platform.observatory_config import Workflow +from observatory.platform.observatory_environment import ( + ObservatoryEnvironment, + ObservatoryTestCase, + find_free_port, + HttpServer, +) + + +# def create_changefiles(self, host, port): +# # Daily +# template_path = os.path.join(self.fixture_dir, "daily-feed", "changefiles.jinja2") +# changefiles = render_template(template_path, host=host, port=port) +# dst = os.path.join(self.fixture_dir, "daily-feed", "changefiles") +# with open(dst, "w") as f: +# f.write(changefiles) +# +# +# def remove_changefiles(self): +# dst = os.path.join(self.fixture_dir, "daily-feed", "changefiles") +# os.remove(dst) + + +class TestUnpaywallUtils(ObservatoryTestCase): + def test_changefile(self): + class MockRelease: + @property + def download_folder(self): + return "/path/to/download/" + + @property + def extract_folder(self): + return "/path/to/extract/" + + @property + def transform_folder(self): + return "/path/to/transform/" + + cf = Changefile( + "changed_dois_with_versions_2020-03-11T005336.jsonl.gz", + pendulum.datetime(2020, 3, 11, 0, 53, 36), + changefile_release=MockRelease(), + ) + + # download_file_path + self.assertEqual( + "/path/to/download/changed_dois_with_versions_2020-03-11T005336.jsonl.gz", cf.download_file_path + ) + + # extract_file_path + self.assertEqual("/path/to/extract/changed_dois_with_versions_2020-03-11T005336.jsonl", cf.extract_file_path) + + # transform_file_path + self.assertEqual( + "/path/to/transform/changed_dois_with_versions_2020-03-11T005336.jsonl", cf.transform_file_path + ) + + # from_dict + dict_ = dict(filename=cf.filename, changefile_date=cf.changefile_date.isoformat()) + self.assertEqual(cf, Changefile.from_dict(dict_)) + + # to_dict + self.assertEqual(dict_, cf.to_dict()) + + def test_snapshot_url(self): + url = snapshot_url("my-api-key") + self.assertEqual(f"https://api.unpaywall.org/feed/snapshot?api_key=my-api-key", url) + + def test_changefiles_url(self): + url = changefiles_url("my-api-key") + self.assertEqual(f"https://api.unpaywall.org/feed/changefiles?api_key=my-api-key", url) + + def test_changefile_download_url(self): + url = changefile_download_url("changed_dois_with_versions_2020-03-11T005336.jsonl.gz", "my-api-key") + self.assertEqual( + f"https://api.unpaywall.org/daily-feed/changefile/changed_dois_with_versions_2020-03-11T005336.jsonl.gz?api_key=my-api-key", + url, + ) + + def test_unpaywall_filename_to_datetime(self): + # Snapshot filename + filename = "unpaywall_snapshot_2023-04-25T083002.jsonl.gz" + dt = unpaywall_filename_to_datetime(filename) + self.assertEqual(pendulum.datetime(2023, 4, 25, 8, 30, 2), dt) + + # Changefile filename + filename = "changed_dois_with_versions_2020-03-11T005336.jsonl.gz" + dt = unpaywall_filename_to_datetime(filename) + self.assertEqual(pendulum.datetime(2020, 3, 11, 0, 53, 36), dt) + + # Filename without time component + filename = "changed_dois_with_versions_2020-03-11.jsonl.gz" + dt = unpaywall_filename_to_datetime(filename) + self.assertEqual(pendulum.datetime(2020, 3, 11, 0, 0, 0), dt) + + @patch("observatory.platform.utils.url_utils.get_http_text_response") + def test_get_unpaywall_changefiles(self, m_get_http_text_response): + # Don't use vcr here because the actual returned data contains API keys and it is a lot of data + m_get_http_text_response.return_value = '{"list":[{"date":"2023-04-25","filename":"changed_dois_with_versions_2023-04-25T080001.jsonl.gz","filetype":"jsonl","last_modified":"2023-04-25T08:03:12","lines":310346,"size":143840367,"url":"https://api.unpaywall.org/daily-feed/changefile/changed_dois_with_versions_2023-04-25T080001.jsonl.gz?api_key=my-api-key"},{"date":"2023-04-24","filename":"changed_dois_with_versions_2023-04-24T080001.jsonl.gz","filetype":"jsonl","last_modified":"2023-04-24T08:04:49","lines":220800,"size":112157260,"url":"https://api.unpaywall.org/daily-feed/changefile/changed_dois_with_versions_2023-04-24T080001.jsonl.gz?api_key=my-api-key"},{"date":"2023-04-23","filename":"changed_dois_with_versions_2023-04-23T080001.jsonl.gz","filetype":"jsonl","last_modified":"2023-04-23T08:03:54","lines":213140,"size":105247617,"url":"https://api.unpaywall.org/daily-feed/changefile/changed_dois_with_versions_2023-04-23T080001.jsonl.gz?api_key=my-api-key"},{"date":"2023-02-24","filename":"changed_dois_with_versions_2023-02-24.jsonl.gz","filetype":"jsonl","last_modified":"2023-03-21T01:51:18","lines":5,"size":6301,"url":"https://api.unpaywall.org/daily-feed/changefile/changed_dois_with_versions_2023-02-24.jsonl.gz?api_key=my-api-key"},{"date":"2020-03-11","filename":"changed_dois_with_versions_2020-03-11T005336.csv.gz","filetype":"csv","last_modified":"2020-03-11T01:27:04","lines":1806534,"size":195900034,"url":"https://api.unpaywall.org/daily-feed/changefile/changed_dois_with_versions_2020-03-11T005336.csv.gz?api_key=my-api-key"}]}' + + expected_changefiles = [ + Changefile( + "changed_dois_with_versions_2023-04-25T080001.jsonl.gz", pendulum.datetime(2023, 4, 25, 8, 0, 1) + ), + Changefile( + "changed_dois_with_versions_2023-04-24T080001.jsonl.gz", pendulum.datetime(2023, 4, 24, 8, 0, 1) + ), + Changefile( + "changed_dois_with_versions_2023-04-23T080001.jsonl.gz", pendulum.datetime(2023, 4, 23, 8, 0, 1) + ), + Changefile("changed_dois_with_versions_2023-02-24.jsonl.gz", pendulum.datetime(2023, 2, 24)), + ] + changefiles = get_unpaywall_changefiles("my-api-key") + self.assertEqual(expected_changefiles, changefiles) + + def test_get_snapshot_file_name(self): + # This cassette was run with a valid api key, which is not saved into the cassette or code + # Set UNPAYWALL_API_KEY to use the real api key + with vcr.use_cassette( + test_fixtures_folder("unpaywall", "get_snapshot_file_name_success.yaml"), + filter_query_parameters=["api_key"], + ): + filename = get_snapshot_file_name(os.getenv("UNPAYWALL_API_KEY", "my-api-key")) + self.assertEqual("unpaywall_snapshot_2023-04-25T083002.jsonl.gz", filename) + + # An invalid API key + with vcr.use_cassette( + test_fixtures_folder("unpaywall", "get_snapshot_file_name_failure.yaml"), + filter_query_parameters=["api_key"], + ): + with self.assertRaises(AirflowException): + get_snapshot_file_name("invalid-api-key") + + +def make_changefiles(start_date: pendulum.DateTime, end_date: pendulum.DateTime) -> List[Changefile]: + changefiles = [ + Changefile( + f"changed_dois_with_versions_{day.format('YYYY-MM-DD')}T080001.jsonl.gz", + pendulum.datetime(day.year, day.month, day.day, 8, 0, 1), + ) + for day in pendulum.period(start_date, end_date).range("days") + ] + + # Make sure sorted + changefiles.sort(key=lambda c: c.changefile_date, reverse=True) + + return changefiles + + +def make_snapshot_filename(snapshot_date: pendulum.DateTime): + return f"unpaywall_snapshot_{snapshot_date.format('YYYY-MM-DDTHHmmss')}.jsonl.gz" + + # [ + # Changefile( + # "changed_dois_with_versions_2023-04-25T080001.jsonl.gz", + # pendulum.datetime(2023, 4, 25, 8, 0, 1) + # ), + # Changefile( + # "changed_dois_with_versions_2023-04-24T080001.jsonl.gz", + # pendulum.datetime(2023, 4, 24, 8, 0, 1) + # ), + # Changefile( + # "changed_dois_with_versions_2023-04-23T080001.jsonl.gz", + # pendulum.datetime(2023, 4, 23, 8, 0, 1) + # ), + # ] + + +class TestUnpaywallTelescope(ObservatoryTestCase): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.dag_id = "unpaywall" + self.project_id = os.getenv("TEST_GCP_PROJECT_ID") + self.data_location = os.getenv("TEST_GCP_DATA_LOCATION") + + # self.fixture_dir = test_fixtures_folder("unpaywall") + # self.snapshot_file = "unpaywall_2021-07-02T151134.jsonl.gz" + # self.snapshot_path = os.path.join(self.fixture_dir, self.snapshot_file) + # self.snapshot_hash = "0f1ac32355c4582d82ae4bc76db17c26" # md5 + + def test_dag_structure(self): + """Test that the DAG has the correct structure.""" + + workflow = UnpaywallTelescope( + dag_id=self.dag_id, + cloud_workspace=self.fake_cloud_workspace, + ) + dag = workflow.make_dag() + self.assert_dag_structure( + { + "check_dependencies": ["fetch_releases"], + "fetch_releases": ["create_datasets"], + "create_datasets": ["download_snapshot"], + "download_snapshot": ["upload_downloaded_snapshot"], + "upload_downloaded_snapshot": ["extract_snapshot"], + "extract_snapshot": ["transform_snapshot"], + "transform_snapshot": ["split_main_table_file"], + "split_main_table_file": ["upload_main_table_files"], + "upload_main_table_files": ["bq_load_main_table"], + "bq_load_main_table": ["download_change_files"], + "download_change_files": ["upload_downloaded_change_files"], + "upload_downloaded_change_files": ["extract_change_files"], + "extract_change_files": ["transform_change_files"], + "transform_change_files": ["upload_upsert_files"], + "upload_upsert_files": ["bq_load_upsert_table"], + "bq_load_upsert_table": ["bq_upsert_records"], + "bq_upsert_records": ["bq_create_main_table_snapshot"], + "bq_create_main_table_snapshot": ["add_new_dataset_releases"], + "add_new_dataset_releases": ["cleanup"], + "cleanup": [], + }, + dag, + ) + + def test_dag_load(self): + """Test that workflow can be loaded from a DAG bag.""" + + env = ObservatoryEnvironment( + workflows=[ + Workflow( + dag_id=self.dag_id, + name="Unpaywall Telescope", + class_name="academic_observatory_workflows.workflows.unpaywall_telescope.UnpaywallTelescope", + cloud_workspace=self.fake_cloud_workspace, + ) + ] + ) + + with env.create(): + self.assert_dag_load_from_config(self.dag_id) + + # We want to do 3 dag runs. First is to load snapshot. + # Second run brings us up to date and applies the overlapping diffs from before snapshot. + # Third checks the "all other situations" case + # @patch("academic_observatory_workflows.workflows.unpaywall_telescope.get_filename_from_http_header") + def test_telescope(self): + # m_makeapi.return_value = self.api + # m_http_response.return_value = self.snapshot_file + # + # env = self.setup_observatory_environment() + # + # first_execution_date = pendulum.datetime(2021, 7, 3) # Snapshot + # second_execution_date = pendulum.datetime(2021, 7, 4) # Not enough updates found + # third_execution_date = pendulum.datetime(2021, 7, 5) # Updates found + # fourth_execution_date = pendulum.datetime(2021, 7, 6) # No updates found + + env = ObservatoryEnvironment(self.project_id, self.data_location, api_port=find_free_port()) + bq_dataset_id = env.add_dataset() + + with env.create(task_logging=True): + workflow = UnpaywallTelescope( + dag_id=self.dag_id, + cloud_workspace=env.cloud_workspace, + bq_dataset_id=bq_dataset_id, + ) + conn = Connection(conn_id=workflow.unpaywall_conn_id, uri="http://:YOUR_API_KEY@") + env.add_connection(conn) + dag = workflow.make_dag() + + # First run: snapshot and initial changefiles + execution_date = pendulum.datetime(2023, 4, 25) + snapshot_date = pendulum.datetime(2023, 4, 25, 8, 30, 2) + changfile_date = pendulum.datetime(2023, 4, 25, 8, 0, 1) + with env.create_dag_run(dag, execution_date) as dag_run: + # Mocked and expected data + release = UnpaywallRelease( + dag_id=self.dag_id, + run_id=dag_run.run_id, + is_first_run=True, + snapshot_date=snapshot_date, + changefile_start_date=changfile_date, + changefile_end_date=changfile_date, + changefiles=[ + Changefile( + "changed_dois_with_versions_2023-04-25T080001.jsonl.gz", + changfile_date, + ) + ], + ) + + # Wait for the previous DAG run to finish + ti = env.run_task("wait_for_prev_dag_run") + self.assertEqual(State.SUCCESS, ti.state) + + # Check dependencies are met + ti = env.run_task(workflow.check_dependencies.__name__) + self.assertEqual(State.SUCCESS, ti.state) + + # Fetch releases and check that we have received the expected snapshot date and changefiles + unpaywall_changefiles = make_changefiles(workflow.start_date, snapshot_date) + snapshot_file_name = make_snapshot_filename(snapshot_date) + with patch.multiple( + "academic_observatory_workflows.workflows.unpaywall_telescope", + get_unpaywall_changefiles=lambda api_key: unpaywall_changefiles, + get_snapshot_file_name=lambda api_key: snapshot_file_name, + ): + task_id = workflow.fetch_releases.__name__ + ti = env.run_task(task_id) + self.assertEqual(State.SUCCESS, ti.state) + msg = ti.xcom_pull( + key=workflow.RELEASE_INFO, + task_ids=task_id, + include_prior_dates=False, + ) + actual_snapshot_date, actual_changefiles, actual_is_first_run = parse_release_msg(msg) + self.assertEqual(snapshot_date, actual_snapshot_date) + self.assertListEqual( + release.changefiles, + actual_changefiles, + ) + self.assertTrue(actual_is_first_run) + + # Create datasets + ti = env.run_task(workflow.create_datasets.__name__) + self.assertEqual(State.SUCCESS, ti.state) + + # Download and process snapshot + server = HttpServer(directory=test_fixtures_folder(self.dag_id), port=find_free_port()) + with server.create(): + with patch.object( + academic_observatory_workflows.workflows.unpaywall_telescope, + "SNAPSHOT_URL", + f"http://localhost:{server.port}/{snapshot_file_name}", + ): + ti = env.run_task(workflow.download_snapshot.__name__) + self.assertEqual(State.SUCCESS, ti.state) + self.assertTrue(os.path.exists(release.snapshot_download_file_path)) + + ti = env.run_task(workflow.upload_downloaded_snapshot.__name__) + self.assertEqual(State.SUCCESS, ti.state) + + ti = env.run_task(workflow.extract_snapshot.__name__) + self.assertEqual(State.SUCCESS, ti.state) + + ti = env.run_task(workflow.transform_snapshot.__name__) + self.assertEqual(State.SUCCESS, ti.state) + + ti = env.run_task("split_main_table_file") + self.assertEqual(State.SUCCESS, ti.state) + + ti = env.run_task(workflow.upload_main_table_files.__name__) + self.assertEqual(State.SUCCESS, ti.state) + + ti = env.run_task(workflow.bq_load_main_table.__name__) + self.assertEqual(State.SUCCESS, ti.state) + + # Download and process changefiles + # TODO: mock download + ti = env.run_task(workflow.download_change_files.__name__) + self.assertEqual(State.SUCCESS, ti.state) + + ti = env.run_task(workflow.upload_downloaded_change_files.__name__) + self.assertEqual(State.SUCCESS, ti.state) + + ti = env.run_task(workflow.extract_change_files.__name__) + self.assertEqual(State.SUCCESS, ti.state) + + ti = env.run_task(workflow.transform_change_files.__name__) + self.assertEqual(State.SUCCESS, ti.state) + + ti = env.run_task(workflow.upload_upsert_files.__name__) + self.assertEqual(State.SUCCESS, ti.state) + + ti = env.run_task(workflow.bq_load_upsert_table.__name__) + self.assertEqual(State.SUCCESS, ti.state) + + ti = env.run_task(workflow.bq_upsert_records.__name__) + self.assertEqual(State.SUCCESS, ti.state) + + ti = env.run_task(workflow.bq_create_main_table_snapshot.__name__) + self.assertEqual(State.SUCCESS, ti.state) + + # Final tasks + ti = env.run_task(workflow.add_new_dataset_releases.__name__) + self.assertEqual(State.SUCCESS, ti.state) + + ti = env.run_task(workflow.cleanup.__name__) + self.assertEqual(State.SUCCESS, ti.state) + + ti = env.run_task("dag_run_complete") + self.assertEqual(State.SUCCESS, ti.state) + + # Second run: no new changefiles + execution_date = pendulum.datetime(2023, 4, 26) + with env.create_dag_run(dag, execution_date): + # Check that all tasks after fetch_releases are skipped + pass + + # Third run: applying changefiles + # In this example there are multiple changefiles + execution_date = pendulum.datetime(2023, 4, 28) + with env.create_dag_run(dag, execution_date): + pass + + a = 1 + + # release = telescope.make_release(**{"ti": ti}) + # + # # Download data + # ti = env.run_task(telescope.download.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # + # # Upload downloaded data + # ti = env.run_task(telescope.upload_downloaded.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # for file in release.download_files: + # self.assert_blob_integrity(env.download_bucket, gcs_blob_name_from_path(file), file) + # + # # Extract data + # ti = env.run_task(telescope.extract.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # + # # Transform data + # ti = env.run_task(telescope.transform.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # self.assertEqual(len(release.transform_files), 1) + # + # # Upload transformed data + # ti = env.run_task(telescope.upload_transformed.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # for file in release.transform_files: + # self.assert_blob_integrity(env.transform_bucket, gcs_blob_name_from_path(file), file) + # + # # Merge updates + # ti = env.run_task(telescope.merge_update_files.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # self.assertEqual(len(release.transform_files), 1) + # + # # Load bq table partitions + # ti = env.run_task(telescope.bq_load_partition.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # + # # Delete changed data from main table + # ti = env.run_task(telescope.bq_delete_old.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # + # # Add new changes + # ti = env.run_task(telescope.bq_append_new.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # main_table_id, partition_table_id = release.dag_id, f"{release.dag_id}_partitions" + # table_id = f"{self.project_id}.{telescope.dataset_id}.{main_table_id}" + # expected_rows = 100 + # self.assert_table_integrity(table_id, expected_rows) + # + # # Cleanup files + # download_folder, extract_folder, transform_folder = ( + # release.download_folder, + # release.extract_folder, + # release.transform_folder, + # ) + # ti = env.run_task(telescope.cleanup.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # self.assert_cleanup(download_folder, extract_folder, transform_folder) + # + # # Load update_dataset_release_task + # dataset_releases = get_dataset_releases(dataset_id=1) + # self.assertEqual(len(dataset_releases), 0) + # ti = env.run_task(telescope.add_new_dataset_releases.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # dataset_releases = get_dataset_releases(dataset_id=1) + # self.assertEqual(len(dataset_releases), 1) + # self.assertEqual(dataset_releases[0].start_date, release.start_date) + # self.assertEqual(dataset_releases[0].end_date, release.end_date) + # + # # Second run (skips) Use dailies + # with env.create_dag_run(dag, second_execution_date): + # # Check dependencies are met + # ti = env.run_task(telescope.check_dependencies.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # + # # Check releases + # ti = env.run_task(telescope.check_releases.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # + # # Download data + # ti = env.run_task(telescope.download.__name__) + # self.assertEqual(ti.state, State.SKIPPED) + # + # # Upload downloaded data + # ti = env.run_task(telescope.upload_downloaded.__name__) + # self.assertEqual(ti.state, State.SKIPPED) + # + # # Extract data + # ti = env.run_task(telescope.extract.__name__) + # self.assertEqual(ti.state, State.SKIPPED) + # + # # Transform data + # ti = env.run_task(telescope.transform.__name__) + # self.assertEqual(ti.state, State.SKIPPED) + # self.assertEqual(len(release.transform_files), 0) + # + # # Upload transformed data + # ti = env.run_task(telescope.upload_transformed.__name__) + # self.assertEqual(ti.state, State.SKIPPED) + # + # # Merge updates + # ti = env.run_task(telescope.merge_update_files.__name__) + # self.assertEqual(ti.state, State.SKIPPED) + # + # # Load bq table partitions + # ti = env.run_task(telescope.bq_load_partition.__name__) + # self.assertEqual(ti.state, State.SKIPPED) + # + # # Delete changed data from main table + # ti = env.run_task(telescope.bq_delete_old.__name__) + # self.assertEqual(ti.state, State.SKIPPED) + # + # # Add new changes + # ti = env.run_task(telescope.bq_append_new.__name__) + # self.assertEqual(ti.state, State.SKIPPED) + # + # # Cleanup files + # ti = env.run_task(telescope.cleanup.__name__) + # self.assertEqual(ti.state, State.SKIPPED) + # + # # Load add_new_dataset_releases + # ti = env.run_task(telescope.add_new_dataset_releases.__name__) + # self.assertEqual(ti.state, State.SKIPPED) + # + # # Third run (downloads) + # with env.create_dag_run(dag, third_execution_date): + # # Check dependencies are met + # ti = env.run_task(telescope.check_dependencies.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # + # # Check releases + # ti = env.run_task(telescope.check_releases.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # + # release = telescope.make_release(**{"ti": ti}) + # + # # Download data + # ti = env.run_task(telescope.download.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # + # # Upload downloaded data + # ti = env.run_task(telescope.upload_downloaded.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # for file in release.download_files: + # self.assert_blob_integrity(env.download_bucket, gcs_blob_name_from_path(file), file) + # + # # Extract data + # ti = env.run_task(telescope.extract.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # + # # Transform data + # ti = env.run_task(telescope.transform.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # self.assertEqual(len(release.transform_files), 3) + # + # # Upload transformed data + # ti = env.run_task(telescope.upload_transformed.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # for file in release.transform_files: + # self.assert_blob_integrity(env.transform_bucket, gcs_blob_name_from_path(file), file) + # + # # Merge updates + # ti = env.run_task(telescope.merge_update_files.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # self.assertEqual(len(release.transform_files), 1) + # + # # Load bq table partitions + # ti = env.run_task(telescope.bq_load_partition.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # main_table_id, partition_table_id = release.dag_id, f"{release.dag_id}_partitions" + # table_id = create_date_table_id( + # partition_table_id, release.end_date, bigquery.TimePartitioningType.DAY + # ) + # table_id = f"{self.project_id}.{telescope.dataset_id}.{table_id}" + # expected_rows = 2 + # self.assert_table_integrity(table_id, expected_rows) + # + # # Delete changed data from main table + # ti = env.run_task(telescope.bq_delete_old.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # table_id = f"{self.project_id}.{telescope.dataset_id}.{main_table_id}" + # expected_rows = 99 + # self.assert_table_integrity(table_id, expected_rows) + # + # # Add new changes + # ti = env.run_task(telescope.bq_append_new.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # table_id = f"{self.project_id}.{telescope.dataset_id}.{main_table_id}" + # expected_rows = 101 + # self.assert_table_integrity(table_id, expected_rows) + # + # # Cleanup files + # download_folder, extract_folder, transform_folder = ( + # release.download_folder, + # release.extract_folder, + # release.transform_folder, + # ) + # ti = env.run_task(telescope.cleanup.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # self.assert_cleanup(download_folder, extract_folder, transform_folder) + # + # # Load update_dataset_release_task + # dataset_releases = get_dataset_releases(dataset_id=1) + # self.assertEqual(len(dataset_releases), 1) + # ti = env.run_task(telescope.add_new_dataset_releases.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # dataset_releases = get_dataset_releases(dataset_id=1) + # latest_release = get_latest_dataset_release(dataset_releases) + # self.assertEqual(len(dataset_releases), 2) + # self.assertEqual(latest_release.start_date, release.start_date) + # self.assertEqual(latest_release.end_date, release.end_date) + # + # # Fourth run. No new data + # with env.create_dag_run(dag, fourth_execution_date): + # # Check dependencies are met + # ti = env.run_task(telescope.check_dependencies.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # + # # Check releases + # with patch( + # "academic_observatory_workflows.workflows.unpaywall_telescope.UnpaywallRelease.get_diff_releases" + # ) as m_diff_releases: + # m_diff_releases.return_value = [] + # ti = env.run_task(telescope.check_releases.__name__) + # self.assertEqual(State.SUCCESS, ti.state) + # + # # Download data + # ti = env.run_task(telescope.download.__name__) + # self.assertEqual(ti.state, State.SKIPPED) + # + # # Clean up template + # self.remove_changefiles() + + # # class TestUnpaywallRelease(unittest.TestCase): # def __init__(self, *args, **kwargs): @@ -57,76 +678,6 @@ # self.snapshot_path = os.path.join(self.fixture_dir, self.snapshot_file) # self.snapshot_hash = "0f1ac32355c4582d82ae4bc76db17c26" # md5 # -# # API environment -# self.host = "localhost" -# self.port = find_free_port() -# configuration = Configuration(host=f"http://{self.host}:{self.port}") -# api_client = ApiClient(configuration) -# self.api = ObservatoryApi(api_client=api_client) # noqa: E501 -# self.env = ObservatoryApiEnvironment(host=self.host, port=self.port) -# -# def setup_api(self): -# dt = pendulum.now("UTC") -# -# name = "Unpaywall Telescope" -# workflow_type = WorkflowType(name=name, type_id=UnpaywallTelescope.DAG_ID) -# self.api.put_workflow_type(workflow_type) -# -# organisation = Organisation( -# name="Curtin University", -# project_id="project", -# download_bucket="download_bucket", -# transform_bucket="transform_bucket", -# ) -# self.api.put_organisation(organisation) -# -# telescope = Workflow( -# name=name, -# workflow_type=WorkflowType(id=1), -# organisation=Organisation(id=1), -# extra={}, -# ) -# self.api.put_workflow(telescope) -# -# table_type = TableType( -# type_id="partitioned", -# name="partitioned bq table", -# ) -# self.api.put_table_type(table_type) -# -# dataset_type = DatasetType( -# type_id="dataset_type_id", -# name="ds type", -# extra={}, -# table_type=TableType(id=1), -# ) -# self.api.put_dataset_type(dataset_type) -# -# dataset = Dataset( -# name="Unpaywall Dataset", -# address="project.dataset.table", -# service="bigquery", -# workflow=Workflow(id=1), -# dataset_type=DatasetType(id=1), -# ) -# self.api.put_dataset(dataset) -# -# @patch("academic_observatory_workflows.workflows.unpaywall_telescope.get_airflow_connection_password") -# def test_api_key(self, m_pass): -# m_pass.return_value = "testpass" -# self.assertEqual(UnpaywallRelease.api_key(), "testpass") -# -# @patch("academic_observatory_workflows.workflows.unpaywall_telescope.get_airflow_connection_password") -# def test_snapshot_url(self, m_pass): -# m_pass.return_value = "testpass" -# url = "https://api.unpaywall.org/feed/snapshot?api_key=testpass" -# self.assertEqual(UnpaywallRelease.snapshot_url(), url) -# -# @patch("academic_observatory_workflows.workflows.unpaywall_telescope.get_airflow_connection_password") -# def test_data_feed_url(self, m_pass): -# m_pass.return_value = "testpass" -# url = "https://api.unpaywall.org/feed/changefiles?interval=day&api_key=testpass" -# self.assertEqual(UnpaywallRelease.data_feed_url(), url) # # @patch("academic_observatory_workflows.workflows.unpaywall_telescope.UnpaywallRelease.get_diff_releases") # @patch("academic_observatory_workflows.workflows.unpaywall_telescope.get_airflow_connection_password") @@ -305,423 +856,3 @@ # feeds = UnpaywallRelease.get_unpaywall_daily_feeds() # self.assertEqual(len(feeds), 1) # self.assertEqual(feeds[0]["snapshot_date"], pendulum.datetime(2021, 7, 2, 15, 11, 34)) -# -# -# class TestUnpaywallTelescope(ObservatoryTestCase): -# def __init__(self, *args, **kwargs): -# super().__init__(*args, **kwargs) -# -# self.project_id = os.getenv("TEST_GCP_PROJECT_ID") -# self.data_location = os.getenv("TEST_GCP_DATA_LOCATION") -# -# self.fixture_dir = test_fixtures_folder("unpaywall") -# self.snapshot_file = "unpaywall_2021-07-02T151134.jsonl.gz" -# self.snapshot_path = os.path.join(self.fixture_dir, self.snapshot_file) -# self.snapshot_hash = "0f1ac32355c4582d82ae4bc76db17c26" # md5 -# -# # API environment -# self.host = "localhost" -# self.port = find_free_port() -# configuration = Configuration(host=f"http://{self.host}:{self.port}") -# api_client = ApiClient(configuration) -# self.api = ObservatoryApi(api_client=api_client) # noqa: E501 -# self.env = ObservatoryApiEnvironment(host=self.host, port=self.port) -# self.org_name = "Curtin University" -# -# def setup_api(self): -# org = Organisation(name=self.org_name) -# result = self.api.put_organisation(org) -# self.assertIsInstance(result, Organisation) -# -# tele_type = WorkflowType(type_id=UnpaywallTelescope.DAG_ID, name="Unpaywall") -# result = self.api.put_workflow_type(tele_type) -# self.assertIsInstance(result, WorkflowType) -# -# telescope = Workflow(organisation=Organisation(id=1), workflow_type=WorkflowType(id=1)) -# result = self.api.put_workflow(telescope) -# self.assertIsInstance(result, Workflow) -# -# table_type = TableType( -# type_id="partitioned", -# name="partitioned bq table", -# ) -# self.api.put_table_type(table_type) -# -# dataset_type = DatasetType( -# type_id="dataset_type_id", -# name="ds type", -# extra={}, -# table_type=TableType(id=1), -# ) -# self.api.put_dataset_type(dataset_type) -# -# dataset = Dataset( -# name="Unpaywall Dataset", -# address="project.dataset.table", -# service="bigquery", -# workflow=Workflow(id=1), -# dataset_type=DatasetType(id=1), -# ) -# result = self.api.put_dataset(dataset) -# self.assertIsInstance(result, Dataset) -# -# def setup_connections(self, env): -# # Add Observatory API connection -# conn = Connection(conn_id=AirflowConns.OBSERVATORY_API, uri=f"http://:password@{self.host}:{self.port}") -# env.add_connection(conn) -# -# @patch("observatory.platform.utils.release_utils.make_observatory_api") -# def test_ctor(self, m_makeapi): -# m_makeapi.return_value = self.api -# -# with self.env.create(): -# self.setup_api() -# -# telescope = UnpaywallTelescope(airflow_vars=[], workflow_id=1) -# self.assertEqual(set(telescope.airflow_vars), set(["transform_bucket", "project_id", "data_location"])) -# -# @patch("observatory.platform.utils.release_utils.make_observatory_api") -# def test_dag_structure(self, m_makeapi): -# """Test that the Crossref Events DAG has the correct structure.""" -# -# m_makeapi.return_value = self.api -# with self.env.create(): -# self.setup_api() -# dag = UnpaywallTelescope(workflow_id=0).make_dag() -# self.assert_dag_structure( -# { -# "check_dependencies": ["check_releases"], -# "check_releases": ["download"], -# "download": ["upload_downloaded"], -# "upload_downloaded": ["extract"], -# "extract": ["transform"], -# "transform": ["upload_transformed"], -# "upload_transformed": ["merge_update_files"], -# "merge_update_files": ["bq_load_partition"], -# "bq_load_partition": ["bq_delete_old"], -# "bq_delete_old": ["bq_append_new"], -# "bq_append_new": ["cleanup"], -# "cleanup": ["add_new_dataset_releases"], -# "add_new_dataset_releases": [], -# }, -# dag, -# ) -# -# @patch("observatory.platform.utils.release_utils.make_observatory_api") -# def test_dag_load(self, m_makeapi): -# """Test that the DAG can be loaded from a DAG bag.""" -# -# m_makeapi.return_value = self.api -# env = ObservatoryEnvironment(self.project_id, self.data_location, api_host=self.host, api_port=self.port) -# -# with env.create(): -# self.setup_connections(env) -# self.setup_api() -# dag_file = os.path.join(module_file_path("academic_observatory_workflows.dags"), "unpaywall_telescope.py") -# self.assert_dag_load("unpaywall", dag_file) -# -# def setup_observatory_environment(self): -# env = ObservatoryEnvironment(self.project_id, self.data_location, api_host=self.host, api_port=self.port) -# self.dataset_id = env.add_dataset() -# return env -# -# def create_changefiles(self, host, port): -# # Daily -# template_path = os.path.join(self.fixture_dir, "daily-feed", "changefiles.jinja2") -# changefiles = render_template(template_path, host=host, port=port) -# dst = os.path.join(self.fixture_dir, "daily-feed", "changefiles") -# with open(dst, "w") as f: -# f.write(changefiles) -# -# def remove_changefiles(self): -# dst = os.path.join(self.fixture_dir, "daily-feed", "changefiles") -# os.remove(dst) -# -# @patch("academic_observatory_workflows.workflows.unpaywall_telescope.get_filename_from_http_header") -# @patch("academic_observatory_workflows.workflows.unpaywall_telescope.UnpaywallRelease.snapshot_url") -# def test_get_snapshot_date(self, m_url, m_get_filename): -# m_url.return_value = "url" -# m_get_filename.return_value = "unpaywall_2021-07-02T151134.jsonl.gz" -# dt = get_snapshot_date() -# self.assertEqual(dt, pendulum.datetime(2021, 7, 2, 15, 11, 34)) -# -# # We want to do 3 dag runs. First is to load snapshot. -# # Second run brings us up to date and applies the overlapping diffs from before snapshot. -# # Third checks the "all other situations" case -# @patch("academic_observatory_workflows.workflows.unpaywall_telescope.get_filename_from_http_header") -# @patch("observatory.platform.utils.release_utils.make_observatory_api") -# def test_telescope_day(self, m_makeapi, m_http_response): -# m_makeapi.return_value = self.api -# m_http_response.return_value = self.snapshot_file -# -# env = self.setup_observatory_environment() -# -# first_execution_date = pendulum.datetime(2021, 7, 3) # Snapshot -# second_execution_date = pendulum.datetime(2021, 7, 4) # Not enough updates found -# third_execution_date = pendulum.datetime(2021, 7, 5) # Updates found -# fourth_execution_date = pendulum.datetime(2021, 7, 6) # No updates found -# -# with env.create(task_logging=True): -# self.setup_api() -# server = HttpServer(directory=self.fixture_dir) -# with server.create(): -# with patch.object( -# UnpaywallRelease, "SNAPSHOT_URL", f"http://{server.host}:{server.port}/{self.snapshot_file}" -# ): -# with patch.object( -# UnpaywallRelease, -# "CHANGEFILES_URL", -# f"http://{server.host}:{server.port}/daily-feed/changefiles", -# ): -# self.create_changefiles(server.host, server.port) -# -# conn = Connection( -# conn_id=UnpaywallRelease.AIRFLOW_CONNECTION, uri="http://:YOUR_API_KEY@localhost" -# ) -# -# env.add_connection(conn) -# -# telescope = UnpaywallTelescope(api_dataset_id=self.dataset_id, workflow_id=1) -# dag = telescope.make_dag() -# -# # First run -# with env.create_dag_run(dag, first_execution_date): -# # Check dependencies are met -# ti = env.run_task(telescope.check_dependencies.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# -# # Check releases -# ti = env.run_task(telescope.check_releases.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# -# release = telescope.make_release(**{"ti": ti}) -# -# # Download data -# ti = env.run_task(telescope.download.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# -# # Upload downloaded data -# ti = env.run_task(telescope.upload_downloaded.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# for file in release.download_files: -# self.assert_blob_integrity(env.download_bucket, gcs_blob_name_from_path(file), file) -# -# # Extract data -# ti = env.run_task(telescope.extract.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# -# # Transform data -# ti = env.run_task(telescope.transform.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# self.assertEqual(len(release.transform_files), 1) -# -# # Upload transformed data -# ti = env.run_task(telescope.upload_transformed.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# for file in release.transform_files: -# self.assert_blob_integrity(env.transform_bucket, gcs_blob_name_from_path(file), file) -# -# # Merge updates -# ti = env.run_task(telescope.merge_update_files.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# self.assertEqual(len(release.transform_files), 1) -# -# # Load bq table partitions -# ti = env.run_task(telescope.bq_load_partition.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# -# # Delete changed data from main table -# ti = env.run_task(telescope.bq_delete_old.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# -# # Add new changes -# ti = env.run_task(telescope.bq_append_new.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# main_table_id, partition_table_id = release.dag_id, f"{release.dag_id}_partitions" -# table_id = f"{self.project_id}.{telescope.dataset_id}.{main_table_id}" -# expected_rows = 100 -# self.assert_table_integrity(table_id, expected_rows) -# -# # Cleanup files -# download_folder, extract_folder, transform_folder = ( -# release.download_folder, -# release.extract_folder, -# release.transform_folder, -# ) -# ti = env.run_task(telescope.cleanup.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# self.assert_cleanup(download_folder, extract_folder, transform_folder) -# -# # Load update_dataset_release_task -# dataset_releases = get_dataset_releases(dataset_id=1) -# self.assertEqual(len(dataset_releases), 0) -# ti = env.run_task(telescope.add_new_dataset_releases.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# dataset_releases = get_dataset_releases(dataset_id=1) -# self.assertEqual(len(dataset_releases), 1) -# self.assertEqual(dataset_releases[0].start_date, release.start_date) -# self.assertEqual(dataset_releases[0].end_date, release.end_date) -# -# # Second run (skips) Use dailies -# with env.create_dag_run(dag, second_execution_date): -# # Check dependencies are met -# ti = env.run_task(telescope.check_dependencies.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# -# # Check releases -# ti = env.run_task(telescope.check_releases.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# -# # Download data -# ti = env.run_task(telescope.download.__name__) -# self.assertEqual(ti.state, State.SKIPPED) -# -# # Upload downloaded data -# ti = env.run_task(telescope.upload_downloaded.__name__) -# self.assertEqual(ti.state, State.SKIPPED) -# -# # Extract data -# ti = env.run_task(telescope.extract.__name__) -# self.assertEqual(ti.state, State.SKIPPED) -# -# # Transform data -# ti = env.run_task(telescope.transform.__name__) -# self.assertEqual(ti.state, State.SKIPPED) -# self.assertEqual(len(release.transform_files), 0) -# -# # Upload transformed data -# ti = env.run_task(telescope.upload_transformed.__name__) -# self.assertEqual(ti.state, State.SKIPPED) -# -# # Merge updates -# ti = env.run_task(telescope.merge_update_files.__name__) -# self.assertEqual(ti.state, State.SKIPPED) -# -# # Load bq table partitions -# ti = env.run_task(telescope.bq_load_partition.__name__) -# self.assertEqual(ti.state, State.SKIPPED) -# -# # Delete changed data from main table -# ti = env.run_task(telescope.bq_delete_old.__name__) -# self.assertEqual(ti.state, State.SKIPPED) -# -# # Add new changes -# ti = env.run_task(telescope.bq_append_new.__name__) -# self.assertEqual(ti.state, State.SKIPPED) -# -# # Cleanup files -# ti = env.run_task(telescope.cleanup.__name__) -# self.assertEqual(ti.state, State.SKIPPED) -# -# # Load add_new_dataset_releases -# ti = env.run_task(telescope.add_new_dataset_releases.__name__) -# self.assertEqual(ti.state, State.SKIPPED) -# -# # Third run (downloads) -# with env.create_dag_run(dag, third_execution_date): -# # Check dependencies are met -# ti = env.run_task(telescope.check_dependencies.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# -# # Check releases -# ti = env.run_task(telescope.check_releases.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# -# release = telescope.make_release(**{"ti": ti}) -# -# # Download data -# ti = env.run_task(telescope.download.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# -# # Upload downloaded data -# ti = env.run_task(telescope.upload_downloaded.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# for file in release.download_files: -# self.assert_blob_integrity(env.download_bucket, gcs_blob_name_from_path(file), file) -# -# # Extract data -# ti = env.run_task(telescope.extract.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# -# # Transform data -# ti = env.run_task(telescope.transform.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# self.assertEqual(len(release.transform_files), 3) -# -# # Upload transformed data -# ti = env.run_task(telescope.upload_transformed.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# for file in release.transform_files: -# self.assert_blob_integrity(env.transform_bucket, gcs_blob_name_from_path(file), file) -# -# # Merge updates -# ti = env.run_task(telescope.merge_update_files.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# self.assertEqual(len(release.transform_files), 1) -# -# # Load bq table partitions -# ti = env.run_task(telescope.bq_load_partition.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# main_table_id, partition_table_id = release.dag_id, f"{release.dag_id}_partitions" -# table_id = create_date_table_id( -# partition_table_id, release.end_date, bigquery.TimePartitioningType.DAY -# ) -# table_id = f"{self.project_id}.{telescope.dataset_id}.{table_id}" -# expected_rows = 2 -# self.assert_table_integrity(table_id, expected_rows) -# -# # Delete changed data from main table -# ti = env.run_task(telescope.bq_delete_old.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# table_id = f"{self.project_id}.{telescope.dataset_id}.{main_table_id}" -# expected_rows = 99 -# self.assert_table_integrity(table_id, expected_rows) -# -# # Add new changes -# ti = env.run_task(telescope.bq_append_new.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# table_id = f"{self.project_id}.{telescope.dataset_id}.{main_table_id}" -# expected_rows = 101 -# self.assert_table_integrity(table_id, expected_rows) -# -# # Cleanup files -# download_folder, extract_folder, transform_folder = ( -# release.download_folder, -# release.extract_folder, -# release.transform_folder, -# ) -# ti = env.run_task(telescope.cleanup.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# self.assert_cleanup(download_folder, extract_folder, transform_folder) -# -# # Load update_dataset_release_task -# dataset_releases = get_dataset_releases(dataset_id=1) -# self.assertEqual(len(dataset_releases), 1) -# ti = env.run_task(telescope.add_new_dataset_releases.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# dataset_releases = get_dataset_releases(dataset_id=1) -# latest_release = get_latest_dataset_release(dataset_releases) -# self.assertEqual(len(dataset_releases), 2) -# self.assertEqual(latest_release.start_date, release.start_date) -# self.assertEqual(latest_release.end_date, release.end_date) -# -# # Fourth run. No new data -# with env.create_dag_run(dag, fourth_execution_date): -# # Check dependencies are met -# ti = env.run_task(telescope.check_dependencies.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# -# # Check releases -# with patch( -# "academic_observatory_workflows.workflows.unpaywall_telescope.UnpaywallRelease.get_diff_releases" -# ) as m_diff_releases: -# m_diff_releases.return_value = [] -# ti = env.run_task(telescope.check_releases.__name__) -# self.assertEqual(State.SUCCESS, ti.state) -# -# # Download data -# ti = env.run_task(telescope.download.__name__) -# self.assertEqual(ti.state, State.SKIPPED) -# -# # Clean up template -# self.remove_changefiles() diff --git a/academic_observatory_workflows/workflows/unpaywall_telescope.py b/academic_observatory_workflows/workflows/unpaywall_telescope.py index 6df21110c..448d6489d 100644 --- a/academic_observatory_workflows/workflows/unpaywall_telescope.py +++ b/academic_observatory_workflows/workflows/unpaywall_telescope.py @@ -79,6 +79,11 @@ def __init__(self, filename: str, changefile_date: pendulum.DateTime, changefile self.changefile_date = changefile_date self.changefile_release = changefile_release + def __eq__(self, other): + if isinstance(other, Changefile): + return self.filename == other.filename and self.changefile_date == other.changefile_date + return False + @staticmethod def from_dict(dict_: Dict) -> Changefile: filename = dict_["filename"] @@ -347,11 +352,8 @@ def make_release(self, **kwargs) -> UnpaywallRelease: msg = ti.xcom_pull( key=UnpaywallTelescope.RELEASE_INFO, task_ids=self.fetch_releases.__name__, include_prior_dates=False ) - + snapshot_date, changefiles, is_first_run = parse_release_msg(msg) run_id = kwargs["run_id"] - snapshot_date = pendulum.parse(msg["snapshot_date"]) - changefiles = [Changefile.from_dict(changefile) for changefile in msg["changefiles"]] - is_first_run = msg["is_first_run"] # The first changefile is the newest and the last one is the oldest changefile_start_date = changefiles[-1].changefile_date @@ -634,6 +636,14 @@ def cleanup(self, release: UnpaywallRelease, **kwargs) -> None: cleanup(dag_id=self.dag_id, execution_date=kwargs["logical_date"], workflow_folder=release.workflow_folder) +def parse_release_msg(msg: Dict): + snapshot_date = pendulum.parse(msg["snapshot_date"]) + changefiles = [Changefile.from_dict(changefile) for changefile in msg["changefiles"]] + is_first_run = msg["is_first_run"] + + return snapshot_date, changefiles, is_first_run + + def snapshot_url(api_key: str) -> str: """Snapshot URL""" @@ -646,8 +656,8 @@ def get_snapshot_file_name(api_key: str) -> str: :return: Snapshot file date. """ - response = snapshot_url(api_key) - return get_filename_from_http_header(response) + url = snapshot_url(api_key) + return get_filename_from_http_header(url) def changefiles_url(api_key: str) -> str: