From 369000b21b5549554529998d4a5f0df6b2638873 Mon Sep 17 00:00:00 2001 From: Jamie Diprose <5715104+jdddog@users.noreply.github.com> Date: Fri, 26 May 2023 16:07:50 +1200 Subject: [PATCH] Add suggested changes and add funders table to openalex_telescope.py --- .../database/schema/openalex/funders.json | 226 ++++++++++++++++++ .../database/schema/openalex/works.json | 6 + .../updated_date=2023-04-02/part_000.json | 3 + .../openalex/2023-04-02/expected/funders.json | 3 + .../openalex/2023-04-02/expected/works.json | 4 +- .../updated_date=2023-04-02/part_000.json | 3 + .../updated_date=2023-04-16/part_000.json | 3 + .../openalex/2023-04-16/expected/funders.json | 3 + .../openalex/2023-04-16/expected/works.json | 4 +- .../workflows/crossref_events_telescope.py | 28 ++- .../workflows/doi_workflow.py | 2 +- .../workflows/openalex_telescope.py | 59 +++-- .../tests/test_crossref_events_telescope.py | 63 +++++ .../tests/test_openalex_telescope.py | 6 +- .../workflows/unpaywall_telescope.py | 26 +- 15 files changed, 405 insertions(+), 34 deletions(-) create mode 100644 academic_observatory_workflows/database/schema/openalex/funders.json create mode 100644 academic_observatory_workflows/fixtures/openalex/2023-04-02/data/funders/updated_date=2023-04-02/part_000.json create mode 100644 academic_observatory_workflows/fixtures/openalex/2023-04-02/expected/funders.json create mode 100644 academic_observatory_workflows/fixtures/openalex/2023-04-16/data/funders/updated_date=2023-04-02/part_000.json create mode 100644 academic_observatory_workflows/fixtures/openalex/2023-04-16/data/funders/updated_date=2023-04-16/part_000.json create mode 100644 academic_observatory_workflows/fixtures/openalex/2023-04-16/expected/funders.json diff --git a/academic_observatory_workflows/database/schema/openalex/funders.json b/academic_observatory_workflows/database/schema/openalex/funders.json new file mode 100644 index 00000000..6858fc9f --- /dev/null +++ b/academic_observatory_workflows/database/schema/openalex/funders.json @@ -0,0 +1,226 @@ +[ + { + "name": "alternate_titles", + "type": "STRING", + "mode": "REPEATED", + "description": "A list of alternate titles for this funder." + }, + { + "name": "cited_by_count", + "type": "INTEGER", + "mode": "NULLABLE", + "description": "The total number Works that cite a work linked to this funder." + }, + { + "name": "country_code", + "type": "STRING", + "mode": "NULLABLE", + "description": "The country where this funder is located, represented as an ISO two-letter country code." + }, + { + "name": "counts_by_year", + "type": "RECORD", + "mode": "REPEATED", + "fields": [ + { + "name": "cited_by_count", + "type": "INTEGER", + "mode": "NULLABLE", + "description": "" + }, + { + "name": "oa_works_count", + "type": "INTEGER", + "mode": "NULLABLE", + "description": "" + }, + { + "name": "works_count", + "type": "INTEGER", + "mode": "NULLABLE", + "description": "" + }, + { + "name": "year", + "type": "INTEGER", + "mode": "NULLABLE", + "description": "" + } + ], + "description": "The values of works_count and cited_by_count for each of the last ten years, binned by year. To put it another way: for every listed year, you can see how many new works are linked to this funder, and how many times any work linked to this funder was cited.\nYears with zero citations and zero works have been removed so you will need to add those back in if you need them." + }, + { + "name": "created_date", + "type": "DATE", + "mode": "NULLABLE", + "description": "The date this Funder object was created in the OpenAlex dataset, expressed as an ISO 8601 date string." + }, + { + "name": "description", + "type": "STRING", + "mode": "NULLABLE", + "description": "A short description of this funder, taken from Wikidata." + }, + { + "name": "display_name", + "type": "STRING", + "mode": "NULLABLE", + "description": "The primary name of the funder." + }, + { + "name": "homepage_url", + "type": "STRING", + "mode": "NULLABLE", + "description": "The URL for this funder's primary homepage." + }, + { + "name": "id", + "type": "STRING", + "mode": "NULLABLE", + "description": "The OpenAlex ID for this funder." + }, + { + "name": "ids", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "openalex", + "type": "STRING", + "mode": "NULLABLE", + "description": "this funder's OpenAlex ID" + }, + { + "name": "ror", + "type": "STRING", + "mode": "NULLABLE", + "description": "this funder's ROR ID" + }, + { + "name": "wikidata", + "type": "STRING", + "mode": "NULLABLE", + "description": "this funder's Wikidata ID" + } + ], + "description": "All the external identifiers that we know about for this funder. IDs are expressed as URIs whenever possible." + }, + { + "name": "image_thumbnail_url", + "type": "STRING", + "mode": "NULLABLE", + "description": "Same as image_url, but it's a smaller image.\nThis is usually a hotlink to a wikimedia image. You can change the width=300 parameter in the URL if you want a different thumbnail size." + }, + { + "name": "image_url", + "type": "STRING", + "mode": "NULLABLE", + "description": "URL where you can get an image representing this funder. Usually this a hotlink to a Wikimedia image, and usually it's a seal or logo." + }, + { + "name": "roles", + "type": "RECORD", + "mode": "REPEATED", + "fields": [ + { + "name": "id", + "type": "STRING", + "mode": "NULLABLE", + "description": "" + }, + { + "name": "role", + "type": "STRING", + "mode": "NULLABLE", + "description": "" + }, + { + "name": "works_count", + "type": "INTEGER", + "mode": "NULLABLE", + "description": "" + } + ], + "description": "List of role objects, which include the role (one of institution, funder, or publisher), the id (OpenAlex ID), and the works_count.\nIn many cases, a single organization does not fit neatly into one role. For example, Yale University is a single organization that is a research university, funds research studies, and publishes an academic journal. The roles property links the OpenAlex entities together for a single organization, and includes counts for the works associated with each role.\nThe roles list of an entity (Funder, Publisher, or Institution) always includes itself. In the case where an organization only has one role, the roles will be a list of length one, with itself as the only item." + }, + { + "name": "summary_stats", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "2yr_cited_by_count", + "type": "INTEGER", + "mode": "NULLABLE", + "description": "" + }, + { + "name": "2yr_h_index", + "type": "INTEGER", + "mode": "NULLABLE", + "description": "" + }, + { + "name": "2yr_i10_index", + "type": "INTEGER", + "mode": "NULLABLE", + "description": "" + }, + { + "name": "2yr_mean_citedness", + "type": "FLOAT", + "mode": "NULLABLE", + "description": "The 2-year mean citedness for this funder. Also known as impact factor." + }, + { + "name": "2yr_works_count", + "type": "INTEGER", + "mode": "NULLABLE", + "description": "" + }, + { + "name": "cited_by_count", + "type": "INTEGER", + "mode": "NULLABLE", + "description": "" + }, + { + "name": "h_index", + "type": "INTEGER", + "mode": "NULLABLE", + "description": "The h-index for this funder." + }, + { + "name": "i10_index", + "type": "INTEGER", + "mode": "NULLABLE", + "description": "The i-10 index for this funder." + }, + { + "name": "oa_percent", + "type": "FLOAT", + "mode": "NULLABLE", + "description": "" + }, + { + "name": "works_count", + "type": "INTEGER", + "mode": "NULLABLE", + "description": "" + } + ], + "description": "Citation metrics for this funder. While the h-index and the i-10 index are normally author-level metrics and the 2-year mean citedness is normally a journal-level metric, they can be calculated for any set of papers, so we include them for funders." + }, + { + "name": "updated_date", + "type": "TIMESTAMP", + "mode": "NULLABLE", + "description": "The last time anything in this funder object changed, expressed as an ISO 8601 date string. This date is updated for any change at all, including increases in various counts." + }, + { + "name": "works_count", + "type": "INTEGER", + "mode": "NULLABLE", + "description": "The number of works linked to this funder." + } +] \ No newline at end of file diff --git a/academic_observatory_workflows/database/schema/openalex/works.json b/academic_observatory_workflows/database/schema/openalex/works.json index da4fbee7..2c3b70c1 100644 --- a/academic_observatory_workflows/database/schema/openalex/works.json +++ b/academic_observatory_workflows/database/schema/openalex/works.json @@ -575,6 +575,12 @@ "mode": "NULLABLE", "description": "True if we know this work has been retracted. This field has high precision but low recall. In other words, if is_retracted is true, the article is definitely retracted. But if is_retracted is False, it still might be retracted, but we just don't know. This is because unfortunately, the open sources for retraction data aren't currently very comprehensive, and the more comprehensive ones aren't sufficiently open for us to use here." }, + { + "name": "language", + "type": "STRING", + "mode": "NULLABLE", + "description": "The language of the work in ISO 639-1 format. The language is automatically detected using the information we have about the work. We use the langdetect software library on the words in the work's abstract, or the title if we do not have the abstract. The source code for this procedure is here. Keep in mind that this method is not perfect, and that in some cases the language of the title or abstract could be different from the body of the work." + }, { "name": "locations", "type": "RECORD", diff --git a/academic_observatory_workflows/fixtures/openalex/2023-04-02/data/funders/updated_date=2023-04-02/part_000.json b/academic_observatory_workflows/fixtures/openalex/2023-04-02/data/funders/updated_date=2023-04-02/part_000.json new file mode 100644 index 00000000..5cae4623 --- /dev/null +++ b/academic_observatory_workflows/fixtures/openalex/2023-04-02/data/funders/updated_date=2023-04-02/part_000.json @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:cbd7a3a14e6e49a334c81593d6339df4b9e0ce0e06686d3b838db814c147c55f +size 8963 diff --git a/academic_observatory_workflows/fixtures/openalex/2023-04-02/expected/funders.json b/academic_observatory_workflows/fixtures/openalex/2023-04-02/expected/funders.json new file mode 100644 index 00000000..794fead5 --- /dev/null +++ b/academic_observatory_workflows/fixtures/openalex/2023-04-02/expected/funders.json @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9926b272b4179d9fb6472533eb9247e8751890351c2115dfd09b72f37870fcb0 +size 9034 diff --git a/academic_observatory_workflows/fixtures/openalex/2023-04-02/expected/works.json b/academic_observatory_workflows/fixtures/openalex/2023-04-02/expected/works.json index 27480049..e1d4fa5c 100644 --- a/academic_observatory_workflows/fixtures/openalex/2023-04-02/expected/works.json +++ b/academic_observatory_workflows/fixtures/openalex/2023-04-02/expected/works.json @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:bf36d0e6ee39de95d484ab8a42d69ac1d0b929f8b5e2dafb20ce4c37cdedeade -size 61651 +oid sha256:a748ded838d21c426a3a62b9e4c6cad1d29cfe26ed6151b4fa445f5f580ba093 +size 61739 diff --git a/academic_observatory_workflows/fixtures/openalex/2023-04-16/data/funders/updated_date=2023-04-02/part_000.json b/academic_observatory_workflows/fixtures/openalex/2023-04-16/data/funders/updated_date=2023-04-02/part_000.json new file mode 100644 index 00000000..3fc8a938 --- /dev/null +++ b/academic_observatory_workflows/fixtures/openalex/2023-04-16/data/funders/updated_date=2023-04-02/part_000.json @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:180753d386c44caed654d604c0e72e544a492b358ecca7b4d8afd108c7d17148 +size 7210 diff --git a/academic_observatory_workflows/fixtures/openalex/2023-04-16/data/funders/updated_date=2023-04-16/part_000.json b/academic_observatory_workflows/fixtures/openalex/2023-04-16/data/funders/updated_date=2023-04-16/part_000.json new file mode 100644 index 00000000..d46c1da5 --- /dev/null +++ b/academic_observatory_workflows/fixtures/openalex/2023-04-16/data/funders/updated_date=2023-04-16/part_000.json @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:979cef9d9a46fdccc0a60a95310ca2f94897313d1b2aeb1466d36f4fdce26c64 +size 4505 diff --git a/academic_observatory_workflows/fixtures/openalex/2023-04-16/expected/funders.json b/academic_observatory_workflows/fixtures/openalex/2023-04-16/expected/funders.json new file mode 100644 index 00000000..c7ded7b6 --- /dev/null +++ b/academic_observatory_workflows/fixtures/openalex/2023-04-16/expected/funders.json @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:512b547958a5ce80f0b6f88ae66ac79daf0c356ffa51b2378822d4ccdf856c2d +size 11808 diff --git a/academic_observatory_workflows/fixtures/openalex/2023-04-16/expected/works.json b/academic_observatory_workflows/fixtures/openalex/2023-04-16/expected/works.json index b22b0d34..746eca5d 100644 --- a/academic_observatory_workflows/fixtures/openalex/2023-04-16/expected/works.json +++ b/academic_observatory_workflows/fixtures/openalex/2023-04-16/expected/works.json @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:7794586a445004d87c56e9de4c3c51df407f8daed8046423a797f94c7495fc73 -size 51501 +oid sha256:569f53208acf8520eb265ddbf4b3e511423e4944d79198f1e2421718abb0cc70 +size 51589 diff --git a/academic_observatory_workflows/workflows/crossref_events_telescope.py b/academic_observatory_workflows/workflows/crossref_events_telescope.py index c7637089..7b23d576 100644 --- a/academic_observatory_workflows/workflows/crossref_events_telescope.py +++ b/academic_observatory_workflows/workflows/crossref_events_telescope.py @@ -19,6 +19,7 @@ import datetime import logging import os +import pathlib import time from concurrent.futures import ProcessPoolExecutor, as_completed, ThreadPoolExecutor from datetime import timedelta @@ -62,8 +63,8 @@ CROSSREF_EVENTS_HOST = "https://api.eventdata.crossref.org/v1/events" DATE_FORMAT = "YYYY-MM-DD" -backend = storage_from_string("memory://") -moving_window = FixedWindowElasticExpiryRateLimiter(backend) +BACKEND = storage_from_string("memory://") +MOVING_WINDOW = FixedWindowElasticExpiryRateLimiter(BACKEND) class CrossrefEventsRelease(ChangefileRelease): @@ -754,14 +755,23 @@ def download_events(request: EventRequest, download_folder: str, n_rows: int): logging.info(f"{request}: skipped, already finished") return - # If data file exists then the previous request must have failed - # Remove them both and start again - if os.path.isfile(data_path) and os.path.isfile(cursor_path): + # If cursor exists then the previous request must have failed + # Remove data file and cursor and start again + if os.path.isfile(cursor_path): logging.warning(f"{request}: deleting data and trying again") logging.warning(f"{request}: deleting {data_path}") - os.remove(data_path) + try: + os.remove(data_path) + except FileNotFoundError: + pass logging.warning(f"{request}: deleting {cursor_path}") - os.remove(cursor_path) + try: + os.remove(cursor_path) + except FileNotFoundError: + pass + + # Create empty cursor file before doing anything else + pathlib.Path(cursor_path).touch() logging.info(f"{request}: downloading") next_cursor = None @@ -829,12 +839,12 @@ def crossref_events_limiter(calls_per_second: int = 10): item = RateLimitItemPerSecond(calls_per_second) # 10 per second while True: - if not moving_window.test(item, identifier): + if not MOVING_WINDOW.test(item, identifier): time.sleep(0.01) else: break - moving_window.hit(item, identifier) + MOVING_WINDOW.hit(item, identifier) def transform_events(download_path: str, transform_folder: str): diff --git a/academic_observatory_workflows/workflows/doi_workflow.py b/academic_observatory_workflows/workflows/doi_workflow.py index 059c5700..e140243d 100644 --- a/academic_observatory_workflows/workflows/doi_workflow.py +++ b/academic_observatory_workflows/workflows/doi_workflow.py @@ -350,7 +350,7 @@ def get_snapshot_date(project_id: str, dataset_id: str, table_id: str, snapshot_ shard_date = table_shard_dates[0] else: raise AirflowException( - f"{project_id}.{dataset_id}.{table_id} " f"with a table shard date <= {snapshot_date} not found" + f"{table_id} with a table shard date <= {snapshot_date} not found" ) return shard_date diff --git a/academic_observatory_workflows/workflows/openalex_telescope.py b/academic_observatory_workflows/workflows/openalex_telescope.py index 00e059dd..6db9411b 100644 --- a/academic_observatory_workflows/workflows/openalex_telescope.py +++ b/academic_observatory_workflows/workflows/openalex_telescope.py @@ -83,6 +83,20 @@ def __init__( prev_end_date: Optional[pendulum.DateTime], release: OpenAlexRelease = None, ): + """This class represents the data and settings related to an OpenAlex entity or table. + + :param entity_name: the name of the entity, e.g. authors, institutions etc. + :param transform: whether the data for the entity needs to be downloaded and transformed locally, or whether + it can be loaded straight into BigQuery. + :param start_date: the start date of the files covered by this release (inclusive). + :param end_date: the end date of the files covered by this release (inclusive). + :param manifest: the Redshift manifest provided by OpenAlex for this entity. + :param merged_ids: the MergedIds provided by OpenAlex for this entity. + :param is_first_run: whether this is the first run or not. + :param prev_end_date: the previous end date. + :param release: the release object. + """ + self.entity_name = entity_name self.transform = transform self.start_date = start_date @@ -320,6 +334,7 @@ def __init__( ("authors", False), ("publishers", False), ("sources", False), + ("funders", False), ] super().__init__( @@ -367,25 +382,9 @@ def __init__( self.add_task(self.aws_to_gcs_transfer) # Download concepts, institutions and works which need to be pre-processed - # Gsutil is used instead of the standard Google Cloud Python library, because it is faster at downloading files - # than the Google Cloud Python library. for entity_name, transform in self.entities: if transform: - output_folder = "{{ release.download_folder }}/data/" + entity_name + "/" - bucket_path = "{{ release.gcs_openalex_data_uri }}data/" + entity_name + "/*" - self.add_operator( - WorkflowBashOperator( - workflow=self, - task_id=f"download_{entity_name}", - bash_command="mkdir -p " - + output_folder - + " && gcloud auth activate-service-account --key-file=${GOOGLE_APPLICATION_CREDENTIALS}" - + " && gsutil -m -q cp -L {{ release.log_path }} -r " - + bucket_path - + " " - + output_folder, - ) - ) + self.add_operator(make_download_bash_op(self, entity_name)) self.add_task(self.transform) # Upsert records @@ -752,6 +751,32 @@ def cleanup(self, release: OpenAlexRelease, **kwargs) -> None: cleanup(dag_id=self.dag_id, execution_date=kwargs["logical_date"], workflow_folder=release.workflow_folder) +def make_download_bash_op(workflow: Workflow, entity_name: str) -> WorkflowBashOperator: + """Download files for an entity from the bucket. + + Gsutil is used instead of the standard Google Cloud Python library, because it is faster at downloading files + than the Google Cloud Python library. + + :param workflow: the workflow. + :param entity_name: the name of the OpenAlex entity, e.g. authors, institutions etc. + :return: a WorkflowBashOperator instance. + """ + + output_folder = "{{ release.download_folder }}/data/" + entity_name + "/" + bucket_path = "{{ release.gcs_openalex_data_uri }}data/" + entity_name + "/*" + return WorkflowBashOperator( + workflow=workflow, + task_id=f"download_{entity_name}", + bash_command="mkdir -p " + + output_folder + + " && gcloud auth activate-service-account --key-file=${GOOGLE_APPLICATION_CREDENTIALS}" + + " && gsutil -m -q cp -L {{ release.log_path }} -r " + + bucket_path + + " " + + output_folder, + ) + + def parse_release_msg(msg: Dict): return [OpenAlexEntity.from_dict(entity) for entity in msg["entities"]] diff --git a/academic_observatory_workflows/workflows/tests/test_crossref_events_telescope.py b/academic_observatory_workflows/workflows/tests/test_crossref_events_telescope.py index 43342cce..a9a392f3 100644 --- a/academic_observatory_workflows/workflows/tests/test_crossref_events_telescope.py +++ b/academic_observatory_workflows/workflows/tests/test_crossref_events_telescope.py @@ -17,11 +17,15 @@ import datetime import json import os +import pathlib from concurrent.futures import as_completed, ThreadPoolExecutor +from unittest.mock import patch import pendulum import responses +import vcr from airflow.utils.state import State +from click.testing import CliRunner from google.cloud.exceptions import NotFound from academic_observatory_workflows.config import test_fixtures_folder @@ -33,6 +37,8 @@ parse_release_msg, EventRequest, crossref_events_limiter, + download_events, + fetch_events, ) from observatory.platform.api import get_dataset_releases from observatory.platform.files import list_files @@ -424,6 +430,63 @@ def test_telescope(self): class TestCrossrefEventsUtils(ObservatoryTestCase): + @patch("academic_observatory_workflows.workflows.crossref_events_telescope.fetch_events") + def test_download_events(self, m_fetch_events): + m_fetch_events.return_value = ([], None) + request = EventRequest(Action.create, pendulum.datetime(2023, 5, 1), "test@test.com") + n_rows = 10 + + # No files + # Hasn't been run before + with CliRunner().isolated_filesystem() as download_folder: + download_events(request, download_folder, n_rows) + assert m_fetch_events.call_count == 1 + + # Data file and cursor file + # The data file was party downloaded because cursor still exists + m_fetch_events.reset_mock() + with CliRunner().isolated_filesystem() as download_folder: + pathlib.Path(pathlib.Path(download_folder) / "created-2023-05-01.jsonl").touch() + pathlib.Path(pathlib.Path(download_folder) / "created-2023-05-01-cursor.txt").touch() + download_events(request, download_folder, n_rows) + assert m_fetch_events.call_count == 1 + + # Data file and no cursor file + # The data file was fully downloaded and cursor file removed + m_fetch_events.reset_mock() + with CliRunner().isolated_filesystem() as download_folder: + pathlib.Path(pathlib.Path(download_folder) / "created-2023-05-01.jsonl").touch() + download_events(request, download_folder, n_rows) + assert m_fetch_events.call_count == 0 + + def test_fetch_events(self): + dt = pendulum.datetime(2023, 5, 1) + mailto = "agent@observatory.academy" + + # Create events + with vcr.use_cassette(test_fixtures_folder("crossref_events", "test_fetch_events_create.yaml")): + request = EventRequest(Action.create, dt, mailto) + events1, next_cursor1 = fetch_events(request, n_rows=10) + self.assertEqual(10, len(events1)) + self.assertIsNotNone(next_cursor1) + events2, next_cursor2 = fetch_events(request, next_cursor1, n_rows=10) + self.assertEqual(10, len(events2)) + self.assertIsNotNone(next_cursor2) + + # Edit events + with vcr.use_cassette(test_fixtures_folder("crossref_events", "test_fetch_events_edit.yaml")): + request = EventRequest(Action.edit, dt, mailto) + events, next_cursor = fetch_events(request, n_rows=10) + self.assertEqual(0, len(events)) + self.assertIsNone(next_cursor) + + # Delete events + with vcr.use_cassette(test_fixtures_folder("crossref_events", "test_fetch_events_delete.yaml")): + request = EventRequest(Action.delete, dt, mailto) + self.assertEqual(0, len(events)) + events, next_cursor = fetch_events(request, n_rows=10) + self.assertIsNone(next_cursor) + def test_crossref_events_limiter(self): n_per_second = 10 diff --git a/academic_observatory_workflows/workflows/tests/test_openalex_telescope.py b/academic_observatory_workflows/workflows/tests/test_openalex_telescope.py index 48f6fadc..ce343098 100644 --- a/academic_observatory_workflows/workflows/tests/test_openalex_telescope.py +++ b/academic_observatory_workflows/workflows/tests/test_openalex_telescope.py @@ -707,7 +707,7 @@ def test_telescope(self): include_prior_dates=False, ) actual_entities = parse_release_msg(msg) - self.assertEqual(6, len(actual_entities)) + self.assertEqual(7, len(actual_entities)) self.assertEqual(expected_entities, actual_entities) ti = env.run_task(workflow.create_datasets.__name__) @@ -916,7 +916,7 @@ def test_telescope(self): include_prior_dates=False, ) actual_entities = parse_release_msg(msg) - self.assertEqual(6, len(actual_entities)) + self.assertEqual(7, len(actual_entities)) self.assertEqual(expected_entities, actual_entities) ti = env.run_task(workflow.create_datasets.__name__) @@ -977,6 +977,7 @@ def test_telescope(self): "publishers": 2, "sources": 3, "works": 3, + "funders": 2, } for entity in expected_entities: expected_rows = expected_row_index[entity.entity_name] @@ -1005,6 +1006,7 @@ def test_telescope(self): "publishers": 5, "sources": 4, "works": 4, + "funders": 5, } for entity in expected_entities: expected_rows = expected_row_index[entity.entity_name] diff --git a/academic_observatory_workflows/workflows/unpaywall_telescope.py b/academic_observatory_workflows/workflows/unpaywall_telescope.py index 9636dbc7..e0c4e247 100644 --- a/academic_observatory_workflows/workflows/unpaywall_telescope.py +++ b/academic_observatory_workflows/workflows/unpaywall_telescope.py @@ -76,6 +76,13 @@ class Changefile: def __init__(self, filename: str, changefile_date: pendulum.DateTime, changefile_release: ChangefileRelease = None): + """Holds the metadata about a single Unpaywall changefile. + + :param filename: the name of the changefile. + :param changefile_date: the date of the changefile. + :param changefile_release: the ChangefileRelease object. + """ + self.filename = filename self.changefile_date = changefile_date self.changefile_release = changefile_release @@ -210,7 +217,24 @@ def __init__( start_date: pendulum.DateTime = pendulum.datetime(2021, 7, 2), schedule_interval: str = "@daily", ): - """Unpaywall Data Feed telescope.""" + """The Unpaywall Data Feed Telescope. + + :param dag_id: the id of the DAG. + :param cloud_workspace: the cloud workspace settings. + :param bq_dataset_id: the BigQuery dataset id. + :param bq_table_name: the BigQuery table name. + :param api_dataset_id: the API dataset id. + :param schema_folder: the schema folder. + :param dataset_description: a description for the BigQuery dataset. + :param table_description: a description for the table. + :param primary_key: the primary key to use for merging changefiles. + :param snapshot_expiry_days: the number of days to keep snapshots. + :param http_header: the http header to use when making requests to Unpaywall. + :param unpaywall_conn_id: Unpaywall connection key. + :param observatory_api_conn_id: the Observatory API connection key. + :param start_date: the start date of the DAG. + :param schedule_interval: the schedule interval of the DAG. + """ super().__init__( dag_id=dag_id,