Skip to content

Commit

Permalink
Update datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
jdddog committed Jun 5, 2023
1 parent 4082616 commit 1f51408
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,11 @@ def __init__(
dag_id: str,
cloud_workspace: CloudWorkspace,
events_start_date: pendulum.DateTime = pendulum.datetime(2017, 2, 17),
bq_dataset_id: str = "crossref",
bq_dataset_id: str = "crossref_events",
bq_table_name: str = "crossref_events",
api_dataset_id: str = "crossref_events",
schema_folder: str = os.path.join(default_schema_folder(), "crossref_events"),
dataset_description: str = "Datasets created by Crossref: https://www.crossref.org/",
dataset_description: str = "The Crossref Events dataset: https://www.eventdata.crossref.org/guide/",
table_description: str = "The Crossref Events dataset: https://www.eventdata.crossref.org/guide/",
snapshot_expiry_days: int = 31,
n_rows: int = 1000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ def __init__(
*,
dag_id: str,
cloud_workspace: CloudWorkspace,
bq_dataset_id: str = "crossref",
bq_dataset_id: str = "crossref_fundref",
bq_table_name: str = "crossref_fundref",
api_dataset_id: str = "crossref_fundref",
schema_folder: str = os.path.join(default_schema_folder(), "crossref_fundref"),
dataset_description: str = "Datasets created by Crossref: https://www.crossref.org/",
dataset_description: str = "The Crossref Funder Registry dataset: https://www.crossref.org/services/funder-registry/",
table_description: str = "The Crossref Funder Registry dataset: https://www.crossref.org/services/funder-registry/",
observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API,
start_date: pendulum.DateTime = pendulum.datetime(2014, 2, 23),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from __future__ import annotations

import functools
import gzip
import json
import logging
import os
Expand Down Expand Up @@ -76,7 +75,7 @@ def __init__(self, *, dag_id: str, run_id: str, snapshot_date: pendulum.DateTime
self.download_file_name = "crossref_metadata.json.tar.gz"
self.download_file_path = os.path.join(self.download_folder, self.download_file_name)
self.extract_files_regex = r".*\.json$"
self.transform_files_regex = r".*\.jsonl.gz$"
self.transform_files_regex = r".*\.jsonl$"


class CrossrefMetadataTelescope(Workflow):
Expand All @@ -91,16 +90,16 @@ def __init__(
*,
dag_id: str,
cloud_workspace: CloudWorkspace,
bq_dataset_id: str = "crossref",
bq_dataset_id: str = "crossref_metadata",
bq_table_name: str = "crossref_metadata",
api_dataset_id: str = "crossref_metadata",
schema_folder: str = os.path.join(default_schema_folder(), "crossref_metadata"),
dataset_description: str = "Datasets created by Crossref: https://www.crossref.org/",
dataset_description: str = "The Crossref Metadata Plus dataset: https://www.crossref.org/services/metadata-retrieval/metadata-plus/",
table_description: str = "The Crossref Metadata Plus dataset: https://www.crossref.org/services/metadata-retrieval/metadata-plus/",
crossref_metadata_conn_id: str = "crossref_metadata",
observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API,
max_processes: int = os.cpu_count(),
batch_size: int = 200,
batch_size: int = 20,
start_date: pendulum.DateTime = pendulum.datetime(2020, 6, 7),
schedule_interval: str = "0 0 7 * *",
catchup: bool = True,
Expand Down Expand Up @@ -241,10 +240,11 @@ def upload_downloaded(self, release: CrossrefMetadataRelease, **kwargs):

def transform(self, release: CrossrefMetadataRelease, **kwargs):
"""Task to transform the CrossrefMetadataRelease release for a given month.
Each extracted file is transformed. This is done in parallel using the ThreadPoolExecutor."""
Each extracted file is transformed."""

logging.info(f"Transform input folder: {release.extract_folder}, output folder: {release.transform_folder}")
clean_dir(release.transform_folder)
finished = 0

# List files and sort so that they are processed in ascending order
input_file_paths = natsorted(list_files(release.extract_folder, release.extract_files_regex))
Expand All @@ -256,21 +256,16 @@ def transform(self, release: CrossrefMetadataRelease, **kwargs):

# Create tasks for each file
for input_file in chunk:
future = executor.submit(transform_file, input_file)
output_file = os.path.join(release.transform_folder, os.path.basename(input_file) + "l")
future = executor.submit(transform_file, input_file, output_file)
futures.append(future)

# Write data from batch into a single jsonl.gz file
# The output file will be a json lines gzip file, hence adding the 'l.gz' to the file extension
file_path = os.path.join(release.transform_folder, f"crossref_metadata_{i:012}.jsonl.gz")
with gzip.open(file_path, "wb") as gzip_file:
with jsonlines.Writer(gzip_file) as writer:
# Write data to the jsonlines.Writer as it becomes available
for future in as_completed(futures):
data = future.result()
writer.write_all(data)

if i % 1000 == 0:
logging.info(f"Transformed {i + 1} files")
# Wait for completed tasks
for future in as_completed(futures):
future.result()
finished += 1
if finished % 1000 == 0:
logging.info(f"Transformed {finished} files")

def upload_transformed(self, release: CrossrefMetadataRelease, **kwargs) -> None:
"""Upload the transformed data to Cloud Storage."""
Expand All @@ -294,7 +289,7 @@ def bq_load(self, release: CrossrefMetadataRelease, **kwargs):
# subfolders: https://cloud.google.com/bigquery/docs/batch-loading-data#load-wildcards
uri = gcs_blob_uri(
self.cloud_workspace.transform_bucket,
f"{gcs_blob_name_from_path(release.transform_folder)}/*.jsonl.gz",
f"{gcs_blob_name_from_path(release.transform_folder)}/*.jsonl",
)
table_id = bq_sharded_table_id(
self.cloud_workspace.output_project_id, self.bq_dataset_id, self.bq_table_name, release.snapshot_date
Expand Down Expand Up @@ -363,24 +358,23 @@ def check_release_exists(month: pendulum.DateTime, api_key: str) -> bool:
return False


def transform_file(input_file_path: str):
def transform_file(input_file_path: str, output_file_path: str):
"""Transform a single Crossref Metadata json file.
The json file is converted to a jsonl file and field names are transformed so they are accepted by BigQuery.
:param input_file_path: the path of the file to transform.
:param output_file_path: where to save the transformed file.
:return: None.
"""

# Open json
with open(input_file_path, mode="r") as input_file:
input_data = json.load(input_file)

# Transform data
output_data = []
for item in input_data["items"]:
output_data.append(transform_item(item))
with open(input_file_path, mode="r") as in_file:
input_data = json.load(in_file)

return output_data
# Transform and write
with jsonlines.open(output_file_path, mode="w", compact=True) as out_file:
for item in input_data["items"]:
out_file.write(transform_item(item))


def transform_item(item):
Expand Down
8 changes: 4 additions & 4 deletions academic_observatory_workflows/workflows/doi_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,14 @@ class Aggregation:
def make_dataset_transforms(
input_project_id: str,
output_project_id: str,
dataset_id_crossref_events: str = "crossref",
dataset_id_crossref_metadata: str = "crossref",
dataset_id_crossref_fundref: str = "crossref",
dataset_id_crossref_events: str = "crossref_events",
dataset_id_crossref_metadata: str = "crossref_metadata",
dataset_id_crossref_fundref: str = "crossref_fundref",
dataset_id_ror: str = "ror",
dataset_id_mag: str = "mag",
dataset_id_orcid: str = "orcid",
dataset_id_open_citations: str = "open_citations",
dataset_id_unpaywall: str = "our_research",
dataset_id_unpaywall: str = "unpaywall",
dataset_id_openalex: str = "openalex",
dataset_id_settings: str = "settings",
dataset_id_observatory: str = "observatory",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ def bq_load_upsert_tables(self, release: OpenAlexRelease, **kwargs):
schema_file_path=entity.schema_file_path,
source_format=SourceFormat.NEWLINE_DELIMITED_JSON,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
ignore_unknown_values=False,
ignore_unknown_values=True,
)
assert (
success
Expand Down Expand Up @@ -699,7 +699,7 @@ def bq_load_delete_tables(self, release: OpenAlexRelease, **kwargs):
source_format=SourceFormat.CSV,
csv_skip_leading_rows=1,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
ignore_unknown_values=False,
ignore_unknown_values=True,
)
assert (
success
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def __init__(
scopus_conn_ids: List[str],
view: str = "STANDARD",
earliest_date: pendulum.DateTime = pendulum.datetime(1800, 1, 1),
bq_dataset_id: str = "elsevier",
bq_dataset_id: str = "scopus",
bq_table_name: str = "scopus",
api_dataset_id: str = "scopus",
schema_folder: str = os.path.join(default_schema_folder(), "scopus"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,9 @@ def test_telescope(self):
ti = env.run_task(workflow.transform.__name__)
self.assertEqual(State.SUCCESS, ti.state)
file_paths = list_files(release.transform_folder, release.transform_files_regex)
self.assertEqual(1, len(file_paths))
self.assertEqual(5, len(file_paths))
for file_path in file_paths:
self.assertTrue(os.path.isfile(file_path))
self.assertTrue(is_gzip(file_path))

# Test that transformed files uploaded
ti = env.run_task(workflow.upload_transformed.__name__)
Expand Down Expand Up @@ -311,7 +310,9 @@ def test_transform_file(self):
"issn_type": [{"value": "0003-987X", "type": "print"}],
}
]
actual_results = transform_file(input_file_path)
output_file_path = os.path.join(t, "output.jsonl")
transform_file(input_file_path, output_file_path)
actual_results = load_jsonl(output_file_path)
self.assertEqual(expected_results, actual_results)

def test_transform_item(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,11 @@ def __init__(
*,
dag_id: str,
cloud_workspace: CloudWorkspace,
bq_dataset_id: str = "our_research",
bq_dataset_id: str = "unpaywall",
bq_table_name: str = "unpaywall",
api_dataset_id: str = "unpaywall",
schema_folder: str = os.path.join(default_schema_folder(), "unpaywall"),
dataset_description: str = "Our Research datasets: http://ourresearch.org/",
dataset_description: str = "Unpaywall Data Feed: https://unpaywall.org/products/data-feed",
table_description: str = "Unpaywall Data Feed: https://unpaywall.org/products/data-feed",
primary_key: str = "doi",
snapshot_expiry_days: int = 7,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def __init__(
institution_ids: List[str],
wos_conn_id: str,
earliest_date: pendulum.DateTime = pendulum.datetime(1800, 1, 1),
bq_dataset_id: str = "clarivate",
bq_dataset_id: str = "web_of_science",
bq_table_name: str = "web_of_science",
api_dataset_id: str = "web_of_science",
schema_folder: str = os.path.join(default_schema_folder(), "web_of_science"),
Expand Down

0 comments on commit 1f51408

Please sign in to comment.