Skip to content

Publish to S3 via a debounced queue#1022

Open
rsaksida wants to merge 3 commits intomasterfrom
feature/s3-bulk-uploads
Open

Publish to S3 via a debounced queue#1022
rsaksida wants to merge 3 commits intomasterfrom
feature/s3-bulk-uploads

Conversation

@rsaksida
Copy link
Copy Markdown
Member

@rsaksida rsaksida commented Apr 3, 2026

Changes S3 graph publishing from per-envelope immediate writes into a debounced, community-
level batch sync.

Previously, each envelope save/delete directly uploaded or deleted a single S3 object from the envelope callback. This PR replaces that with an EnvelopeGraphSync tracking record, a delayed ActiveJob, a batch sync service, and a partial graph indexing Argo workflow submission. The result is fewer S3/indexing operations during bursts of publishing activity, a clear sync lock while the batch is being flushed, and a manifest-driven indexing handoff.

This PR is necessary to support https://github.com/CredentialEngine/ce-registry/issues/25

Changes

  • Added envelope_graph_syncs to track one pending S3 graph sync per envelope community.
  • Changed Envelope callbacks to record S3 sync activity after commit instead of directly uploading/deleting S3 objects.
  • Added SyncEnvelopeGraphsWithS3Job to debounce publishing activity before syncing to S3.
  • Added SyncPendingEnvelopeGraphsWithS3 to batch-upload/delete graph objects from S3 based on
    pending EnvelopeVersion records.
  • Added API publish locking for v1 and v2 publish endpoints while a community’s S3 sync is actively running.
  • Added stale sync lock clearing so a stuck sync can be released after ENVELOPE_GRAPH_SYNC_LOCK_TIMEOUT_SECONDS, defaulting to 600 seconds.
  • Added partial graph manifest generation at: /<community>/manifests/partial-graphs/<timestamp>.json.gz
  • Also updates: /<community>/manifests/partial-graphs/latest.json.gz
  • Added SubmitPartialGraphIndexWorkflow to submit the Argo partial indexing workflow after a manifest is written.
  • Changed envelope download workflow submission to use the fixed template name s3-graphs-zip instead of reading ARGO_WORKFLOWS_TEMPLATE_NAME.

New behavior

This prevents every individual publish from immediately causing its own S3 write and downstream indexing trigger. Instead, publishes within the debounce window are collapsed into a single batch. That batch syncs only the latest pending version per CTID, writes the affected graph files to S3, writes a gzipped manifest of uploaded S3 keys, and then triggers the partial indexing workflow using that manifest.

Deletes are also handled in the batch. Delete-only batches do not write a manifest or trigger Argo, because the manifest is currently an upload-input list for partial indexing.

Flow Up To Argo Workflow Trigger

  1. A publish or update creates/updates an Envelope.
  2. After the DB transaction commits, the Envelope callback calls EnvelopeGraphSync.record_activity! with the envelope community and latest PaperTrail envelope version id.
  3. A destroy does the same after commit, using the destroy version id.
  4. EnvelopeGraphSync.record_activity! creates or updates the per-community sync tracker.
  5. The sync tracker records:
    • last_activity_at
    • last_activity_version_id
    • scheduled_for_at
    • last_synced_version_id, initialized to the previous version when the sync record is first
      created
  6. If no sync job is currently pending, it enqueues SyncEnvelopeGraphsWithS3Job for now + ENVELOPE_GRAPH_SYNC_DEBOUNCE_SECONDS, defaulting to 60 seconds.
  7. Additional publish activity during the debounce window updates last_activity_at and advances last_activity_version_id, but does not enqueue another job.
  8. When SyncEnvelopeGraphsWithS3Job runs, it checks whether the debounce quiet period has elapsed.
  9. If new activity happened inside the debounce window, the job updates scheduled_for_at and re- enqueues itself for the new quiet-period end.
  10. Once the quiet period has elapsed, the job clears scheduled_for_at, marks the sync as syncing, and calls SyncPendingEnvelopeGraphsWithS3.
  11. While syncing is true, the v1 and v2 publish APIs reject new publish requests for that community with HTTP 503 and: Publishing is temporarily locked while S3 sync is in progress
  12. SyncPendingEnvelopeGraphsWithS3 finds pending EnvelopeVersion rows for that community:
    • only Envelope versions
    • only versions with a CTID
    • only versions after last_synced_version_id
    • only versions up to the captured cutoff version id
    • only the latest pending version per CTID
  13. If a CTID has a newer version after the cutoff, that CTID is skipped for this batch so a stale version is not uploaded or deleted.
  14. For create/update versions, the service uploads the envelope’s processed_resource JSON to: /<community>/<ctid>.json
  15. For destroy versions, the service deletes: /<community>/<ctid>.json
  16. Uploaded envelopes have their s3_url updated from the S3 object public URL.
  17. The service writes a gzipped manifest containing uploaded graph S3 keys only.
  18. The manifest is written both to a timestamped key and to latest.json.gz.
  19. If a manifest was written, SubmitPartialGraphIndexWorkflow submits the Argo workflow.
  20. The Argo workflow template is: update-index-graphs-input-file-elasticsearch
  21. The workflow is generated with a community-specific prefix like: <community>-partial-graph-index-
  22. The workflow parameters include:
    • task-image
    • index-name
    • input-bucket
    • input-file-key
    • source-bucket
    • prefix-path
    • aws-region
  23. After the batch completes, the sync is marked synced through the cutoff version id and the active sync lock is cleared.

Comment thread app/jobs/sync_envelope_graphs_with_s3_job.rb
@rohit-joy
Copy link
Copy Markdown
Contributor

rohit-joy commented Apr 7, 2026

@rsaksida Btw, I was expecting something like this as as mentioned in the Slack thread.

We need # 3 and # 4 from the screenshot to be reusable from Publisher which will take over the activity of direct publishing to S3.

image

- Debounces S3 publishing events
- Locks the API while S3 syncing is in progress
- Uploads manifest with all uploaded S3 files
- Triggers Argo workflow for indexing
@rsaksida rsaksida force-pushed the feature/s3-bulk-uploads branch from 22ff5f2 to e26a4a0 Compare April 7, 2026 19:04
@rsaksida
Copy link
Copy Markdown
Member Author

rsaksida commented Apr 7, 2026

@rohit-joy

@rsaksida Btw, I was expecting something like this as as mentioned in the Slack thread.

We need # 3 and # 4 from the screenshot to be reusable from Publisher which will take over the activity of direct publishing to S3.

What this PR accomplishes is basically uploading a set of graphs to S3.
After the graphs are uploaded, right now it calls the workflow for indexing a partial set (as requested in the issue).
We can change it to call any other workflow though (or multiple workflows).
Maybe there should be an Argo workflow that calls on other workflows in sequence?

@rsaksida rsaksida marked this pull request as ready for review April 7, 2026 19:23
@rohit-joy
Copy link
Copy Markdown
Contributor

Ok, I was commenting earlier when this PR was in Draft state.

What this PR accomplishes is basically uploading a set of graphs to S3.

@rsaksida Can we also have it call an Argo workflow to copy the partial set to the corresponding S3 buckets for Graphs/Resources/Metadata? Or is this already happening? I don't see this in the documentation above.

After the graphs are uploaded, right now it calls the workflow for indexing a partial set (as requested in the issue).

Great! Indexing to ES is expected with Argo workflow.

Maybe there should be an Argo workflow that calls on other workflows in sequence?

Not now. We will do that after these atomic workflows are proven reliable. Otherwise, that will simply add more complexity.

@rohit-joy rohit-joy requested a review from chuang-CE April 7, 2026 20:18
@rohit-joy
Copy link
Copy Markdown
Contributor

@rsaksida Please deploy in the TEST environment so that @chuang-CE can test and approve the PR.

@rohit-joy
Copy link
Copy Markdown
Contributor

Ok, I was commenting earlier when this PR was in Draft state.

What this PR accomplishes is basically uploading a set of graphs to S3.

@rsaksida Can we also have it call an Argo workflow to copy the partial set to the corresponding S3 buckets for Graphs/Resources/Metadata? Or is this already happening? I don't see this in the documentation above.

After the graphs are uploaded, right now it calls the workflow for indexing a partial set (as requested in the issue).

Great! Indexing to ES is expected with Argo workflow.

Maybe there should be an Argo workflow that calls on other workflows in sequence?

Not now. We will do that after these atomic workflows are proven reliable. Otherwise, that will simply add more complexity.

@rsaksida Question for you above, in case you missed it.

When a sync fails, next sync should pick up the leftover work
Reuse manifest key when calling Argo
@rohit-joy
Copy link
Copy Markdown
Contributor

Questions about your latest commit:

  • Debounces S3 publishing events
  • Locks the API while S3 syncing is in progress
  • Uploads manifest with all uploaded S3 files
  • Triggers Argo workflow for indexing

@rsaksida Did you see my comment above? Can we do the copying to S3 through another Argo workflow too?

See screenshot here: #1022 (comment)

Ruby Registry continues to publish to PGSQL after processing the incoming JSON-LDs. No change there other than doing it in a a batch. Ruby Registry also generates the ZIP or multiple ZIPs depending on whether the background job separates the input JSON-LDs into Resource, Graph and Metadata as well as CRUD operations. All other work of copying to S3 and indexing is left to Argo Workflows.

I cannot stress enough how critical it is to separate these concerns to allow the new publisher to do direct publishing by calling the same Argo workflows without more reworking/hand holding this code base of Ruby Registry.

@rohit-joy
Copy link
Copy Markdown
Contributor

@chuang-CE Can you post here also what is blocking your sign off?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants