Skip to content

Commit

Permalink
Manifest data caching
Browse files Browse the repository at this point in the history
  • Loading branch information
keegansmith21 committed Sep 18, 2023
1 parent e94baf6 commit 11b6e0b
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
15 changes: 9 additions & 6 deletions academic_observatory_workflows/workflows/orcid_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ def __init__(self, download_dir: str, transform_dir: str, batch_str: str):

os.makedirs(self.download_batch_dir, exist_ok=True)

@cached_property
def _manifest_data(self) -> List[Dict]:
with open(self.manifest_file, "r") as f:
return list(csv.DictReader(f))

@property
def existing_records(self) -> List[str]:
"""List of existing ORCID records on disk for this ORCID directory."""
Expand All @@ -115,16 +120,12 @@ def missing_records(self) -> List[str]:
@cached_property
def expected_records(self) -> List[str]:
"""List of expected ORCID records for this ORCID directory. Derived from the manifest file"""
with open(self.manifest_file, "r") as f:
reader = csv.DictReader(f)
return [os.path.basename(row["blob_name"]) for row in reader]
return [os.path.basename(row["blob_name"]) for row in self._manifest_data]

@cached_property
def blob_uris(self) -> List[str]:
"""List of blob URIs from the manifest this ORCID directory."""
with open(self.manifest_file, "r") as f:
reader = csv.DictReader(f)
return [gcs_blob_uri(bucket_name=row["bucket_name"], blob_name=row["blob_name"]) for row in reader]
return [gcs_blob_uri(bucket_name=row["bucket_name"], blob_name=row["blob_name"]) for row in self._manifest_data]


class OrcidRelease(ChangefileRelease):
Expand Down Expand Up @@ -436,6 +437,8 @@ def download(self, release: OrcidRelease, **kwargs):
if not orcid_batch.missing_records:
logging.info(f"All files present for {orcid_batch.batch_str}. Skipping download.")
continue
print(orcid_batch.batch_str)
print(orcid_batch._manifest_data)

logging.info(f"Downloading files for ORCID directory: {orcid_batch.batch_str}")
with open(orcid_batch.download_log_file, "w") as f:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ def test_telescope(self):
self.assertTrue(os.path.exists(first_release.master_manifest_file))
for batch in first_release.orcid_batches():
self.assertTrue(os.path.exists(batch.manifest_file))
with open(first_release.master_manifest_file, "r") as f:
content = list(csv.DictReader(f))
self.assertEqual(len(content), len(OrcidTestRecords.first_run_records))

# Download the files from the transfer bucket
# s5cmd fails for any reason:
Expand Down Expand Up @@ -448,6 +451,7 @@ def test_telescope(self):
bucket_name=workflow.orcid_bucket, file_paths=file_paths, blob_names=blob_names
)
self.assertTrue(success)

# Create snapshot
ti = env.run_task(workflow.bq_create_main_table_snapshot.__name__)
self.assertEqual(State.SUCCESS, ti.state)
Expand All @@ -462,6 +466,9 @@ def test_telescope(self):
ti = env.run_task(workflow.create_manifests.__name__)
self.assertEqual(State.SUCCESS, ti.state)
self.assertTrue(os.path.exists(second_release.master_manifest_file))
with open(second_release.master_manifest_file, "r") as f:
content = list(csv.DictReader(f))
self.assertEqual(len(content), len(OrcidTestRecords.second_run_records))
for batch in second_release.orcid_batches():
self.assertTrue(os.path.exists(batch.manifest_file))

Expand Down

0 comments on commit 11b6e0b

Please sign in to comment.