Skip to content

Conversation

ghukill
Copy link
Contributor

@ghukill ghukill commented Mar 31, 2025

Purpose and background context

This PR parallelizes the retrieveal and/or generation of checksums for files. This is achieved by parallel threads performing AWS operations.

An AIP was encountered in Dev, s3://cdps-storage-dev-222053980223-aipstore5b/5b33/1bf3/eb1f/4017/bbe8/c24a/9f60/f4cd/2014_039_002-5b331bf3-eb1f-4017-bbe8-c24a9f60f4cd/, that had over 8k files in the /data folder. Performing that work sequentially was timing out 15 minute lambdas. Performing that work in parallel -- with anywhere from 256 to 512 threads -- is finishing in around 6-7 minutes (maybe faster in deployed form). While ~7 minutes is a very long time for an HTTP request, it nonetheless completes. When performing these AIP validations at scale, it's unlikely that many will have that many files, but this will allow them to complete.

Additionally, per commit 1cd363e, the Athena SQL query was optimized to reduce data scanned.

Here is a screenshot from Cloudwatch showing a lambda invocation for this 8k+ file AIP validation. Totaly time is around 5 minutes, and only about 146mb 169MB used:

Screenshot 2025-03-31 at 11 44 52 AM

How can a reviewer manually see the effects of these changes?

1- Set AWS admin Dev credentials.

2- Set env vars:

WORKSPACE=dev
SENTRY_DSN=None
VERBOSE_LOGGING=true

AWS_ATHENA_DATABASE=cdps-storage-dev-222053980223-aip-database
AWS_ATHENA_WORK_GROUP=default-222053980223-us-east-1

WARNING_ONLY_LOGGERS=asyncio,botocore,urllib3,s3transfer,boto3

CHALLENGE_SECRET=aip_validating_since_2025

# tested up to 512, which works and is faster
CHECKSUM_NUM_WORKERS=256

3- Perform validation on an AIP with 26 files:

from lambdas.aip import *
from lambdas.config import configure_logger

configure_logger(logging.getLogger(), verbose=False)

aip = AIP("s3://cdps-storage-dev-222053980223-aipstore4b/4b06/4dbd/1492/4ff7/a853/ab28/e9c2/fc74/Level04-preservation-sampleD-4b064dbd-1492-4ff7-a853-ab28e9c2fc74")

result = aip.validate()

4- Perform validation on an AIP with 8k+ files (_OPTIONAL: takes about 6-7 minutes to complete):

from lambdas.aip import *
from lambdas.config import configure_logger

configure_logger(logging.getLogger(), verbose=False)

aip = AIP("s3://cdps-storage-dev-222053980223-aipstore5b/5b33/1bf3/eb1f/4017/bbe8/c24a/9f60/f4cd/2014_039_002-5b331bf3-eb1f-4017-bbe8-c24a9f60f4cd/")

result = aip.validate()

Includes new or updated dependencies?

NO

Changes expectations for external applications?

YES: lambda will not time out, Athena queries fastesr

What are the relevant tickets?

Developer

  • All new ENV is documented in README
  • All new ENV has been added to staging and production environments
  • All related Jira tickets are linked in commit message(s)
  • Stakeholder approval has been confirmed (or is not needed)

Code Reviewer(s)

  • The commit message is clear and follows our guidelines (not just this PR message)
  • There are appropriate tests covering any new functionality
  • The provided documentation is sufficient for understanding any new functionality introduced
  • Any manual tests have been performed and verified
  • New dependencies are appropriate or there were no changes

ghukill added 2 commits March 31, 2025 08:51
Why these changes are being introduced:

It was noticed that an Athena query was scanning 10mb+ of data
for a query that felt like it could have scanned less.

How this addresses that need:
* Use 'union all' instead of 'union'
* Use window function 'row_number()' instead of a self join

Side effects of this change:
* Less data scanned per query, less money

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/IR-193
Why these changes are being introduced:

An AIP was encountered that had 8k+ files in the data folder.
This required a VERY long time to generate checksums sequentially
eventually timing out the lambda.

Originally this work was performed in parallel, but it was
simplified during the initial first pass.

How this addresses that need:
* Massively parallelize AWS operations to retrieve and/or generate
a checksum for a file.  S3 is designed for parallelization and the
local resources requirement is small for all the parallel threads.

Side effects of this change:
* Increased system resources: when using 256 threads looking at
about 500mb memory usage.

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/IR-193
@ghukill ghukill marked this pull request as ready for review March 31, 2025 13:46
@ghukill ghukill requested a review from a team as a code owner March 31, 2025 13:46

file_checksums = {}
s3_inventory_indexed = self.s3_inventory.set_index("key")
file_checksums_lock = Lock()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To those not familiar, the threading.Lock is a neat thing that allows safe access to an object even with multiple threads running. We use it on line 239 before updating the dictionary.

Why these changes are being introduced:

The updated AIP inventory Athena query has a potentially
non-deterministic quality where S3 inventory rows may not
have originated from the most recent S3 inventory pull
(which is partitioned by date).  Though the data may
have been accurate, it felt a little sloppy.

How this addresses that need:

Instead of a window function, we focus on retrieving only
the most recent data pull via the most recent 'dt' partition.
Additionally, if we know a) it is the most recent data pull
and b) it is 'is_latest' then we don't need to sort by
last_modified_date anymore.

Confirmed that query time remains about the same, but data
scanned is reduced, suggesting that use of partitions is more
effective here.

Side effects of this change:
* None

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/IR-193
@ghukill
Copy link
Contributor Author

ghukill commented Apr 1, 2025

@jonavellecuerdo, a new commit 4f566d3 has been added that layers in the findings from our Athena discussions.

Thanks again for asking -- out of band of PR comments -- about the query! That question revealed the bug which this commit hopefully addresses.

@coveralls
Copy link

coveralls commented Apr 1, 2025

Pull Request Test Coverage Report for Build 14225739445

Details

  • 35 of 39 (89.74%) changed or added relevant lines in 4 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage decreased (-0.5%) to 94.03%

Changes Missing Coverage Covered Lines Changed/Added Lines %
lambdas/config.py 6 7 85.71%
lambdas/aip.py 26 29 89.66%
Totals Coverage Status
Change from base Build 14114742022: -0.5%
Covered Lines: 315
Relevant Lines: 335

💛 - Coveralls

ghukill added 2 commits April 2, 2025 08:29
Why these changes are being introduced:

An edge case was discovered where, if the query was run
during an S3 inventory data load, the value of 'max(dt)'
could be the value from a Glue table that had already been
updated, but others had not.  In this way, it would look
like -- for that dt partition -- that the files didn't exist.

How this addresses that need:

This simplifies the query by filtering each glue table
immediately to the most recent dt partition.

Side effects of this change:
* None

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/IR-193
@ghukill
Copy link
Contributor Author

ghukill commented Apr 2, 2025

@jonavellecuerdo - another commit has been added that updates the Athena SQL query for most recent S3 inventory data. Ready for re-review at your convenience!

Copy link

@jonavellecuerdo jonavellecuerdo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ghukill! Overall, I think it's looking good, but I had a couple of questions for you. In particular, I am curious if the code to process batches of files in an AIP in parallel could be simplified using itertools.batched. 🤔

@ghukill ghukill merged commit db36133 into main Apr 2, 2025
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants