Skip to content

Commit

Permalink
Finish Unpaywall unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jdddog committed Apr 28, 2023
1 parent a6f0b53 commit 99960d2
Show file tree
Hide file tree
Showing 14 changed files with 340 additions and 560 deletions.
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
52 changes: 27 additions & 25 deletions academic_observatory_workflows/workflows/openalex_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,13 @@ def __init__(
self.add_setup_task(self.check_dependencies)
self.add_setup_task(self.fetch_releases)
self.add_task(self.create_datasets)

# Create snapshots of main tables in case we mess up
# This is done before updating the tables to make sure that the snapshots haven't expired before the tables
# are updated
self.add_task(self.bq_create_main_table_snapshots)

# Transfer, download and transform data
self.add_task(self.aws_to_gcs_transfer)

# Download concepts, institutions and works which need to be pre-processed
Expand All @@ -337,11 +344,6 @@ def __init__(
)
self.add_task(self.transform)

# Create snapshots of main tables in case we messed up
# This is done before updating the tables to make sure that the snapshots haven't expired before the tables
# are updated
self.add_task(self.bq_create_main_table_snapshots)

# Upsert records
self.add_task(self.upload_upsert_files)
self.add_task(self.bq_load_upsert_tables)
Expand Down Expand Up @@ -491,6 +493,26 @@ def create_datasets(self, release: OpenAlexRelease, **kwargs) -> None:
description=self.dataset_description,
)

def bq_create_main_table_snapshots(self, release: OpenAlexRelease, **kwargs):
"""Create a snapshot of each main table. The purpose of this table is to be able to rollback the table
if something goes wrong. The snapshot expires after self.snapshot_expiry_days."""

if release.is_first_run:
logging.info(f"bq_create_main_table_snapshots: skipping as snapshots are not created on the first run")
return

for entity in release.entities:
expiry_date = pendulum.now().add(days=self.snapshot_expiry_days)
logging.info(
f"bq_create_main_table_snapshots: creating backup snapshot for {entity.bq_main_table_id} as {entity.bq_snapshot_table_id} expiring on {expiry_date}"
)
success = bq_snapshot(
src_table_id=entity.bq_main_table_id, dst_table_id=entity.bq_snapshot_table_id, expiry_date=expiry_date
)
assert (
success
), f"bq_create_main_table_snapshots: error creating backup snapshot for {entity.bq_main_table_id} as {entity.bq_snapshot_table_id} expiring on {expiry_date}"

def aws_to_gcs_transfer(self, release: OpenAlexRelease, **kwargs):
"""Transfer files from AWS bucket to Google Cloud bucket"""

Expand Down Expand Up @@ -587,26 +609,6 @@ def upload_upsert_files(self, release: OpenAlexRelease, **kwargs):
success = gcs_upload_files(bucket_name=self.cloud_workspace.transform_bucket, file_paths=file_paths)
set_task_state(success, self.upload_upsert_files.__name__, release)

def bq_create_main_table_snapshots(self, release: OpenAlexRelease, **kwargs):
"""Create a snapshot of each main table. The purpose of this table is to be able to rollback the table
if something goes wrong. The snapshot expires after self.snapshot_expiry_days."""

if release.is_first_run:
logging.info(f"bq_create_main_table_snapshots: skipping as snapshots are not created on the first run")
return

for entity in release.entities:
expiry_date = pendulum.now().add(days=self.snapshot_expiry_days)
logging.info(
f"bq_create_main_table_snapshots: creating backup snapshot for {entity.bq_main_table_id} as {entity.bq_snapshot_table_id} expiring on {expiry_date}"
)
success = bq_snapshot(
src_table_id=entity.bq_main_table_id, dst_table_id=entity.bq_snapshot_table_id, expiry_date=expiry_date
)
assert (
success
), f"bq_create_main_table_snapshots: error creating backup snapshot for {entity.bq_main_table_id} as {entity.bq_snapshot_table_id} expiring on {expiry_date}"

def bq_load_upsert_tables(self, release: OpenAlexRelease, **kwargs):
"""Load the upsert table for each entity."""

Expand Down
Loading

0 comments on commit 99960d2

Please sign in to comment.