Skip to content

Conversation

@johnny-schmidt
Copy link
Contributor

What

Implements the spec outlined here.

Lays groundwork for the CDK to use the new Speed state protocol. TLDR:

  • in sockets, state and messages arrive in parallel on multiple threads
  • to maintain coherence, source will put a string partition_id on both the state message and its associated records (called internally CheckpointId)
  • source will also mark the state message with a global index (id for bw-compatibility w/ the platform, called internally CheckpointIndex); the index specifies the order in which the state should be returned/persisted

Previous STDIO workflow:

  • destination keeps a monotonic index (called CheckpointId currently), incremented as new state arrives
  • records are marked with the index
  • the index both identifies and orders the records
  • CheckpointManager keeps state by CheckpointId and polls the StreamManager to find out if enough records have been persisted to flush
  • CheckpointManager enforces that state arrive in order, which guarantees that it is released in order
  • StreamManager counts persisted records by CheckpointId as they are processed

To prepare the STDIO workflow for Speed:

  • CheckpointManager now keeps state by CheckpointKey which composes CheckpointId and CheckpointIndex; the index is used for ordering, the id is used to poll the StreamManager to determine whether all the associated records have been flushed
  • StreamManager keeps persisted counts by CheckpointId only (it knows nothing about the Index)
  • In STDIO mode, we still keep the monotonic index for CheckpointIndex, and we generate fake ids by CheckpointIndex::value.toString()

Some callouts:
The source will try to release states in order, but long-term we can't rely on that, and using multiple threads means we can't guarantee the destination will process the state in order. Implications:

  • CheckpointManager now stores state in TreeMaps sorted by index instead of LinkedQueues
  • We no longer require that state messages arrive in order
  • Before releasing state message w/ index N, we must verify that all state with index 1..N-1 has been sent
  • Because the CheckpointManager is now enforcing order, the StreamManager doesn't need to check persistence for every record up to now, just the checkpoint id in question (and it can't anyway, because it knows nothing about indexes)

Additional changes:

  • There was some unnecessary complexity in the StreamManager for tracking metrics by task, and some elaborate debug logging that was not worth the maintenance cost (and which will be mooted by planned speed changes anyway), so I removed it to make the flow clearer.
  • I also simplied Global state management a bit -- there's no need to maintain index/id by stream (that's an artifact of when we used to aggregate ranges), so I simplified it to one key per state message w/ checks on data sufficiency per record. (Now it's pretty close to being generalizeable if anyone ever wants to break the CheckpointManagers up into separate Global and Stream classes.)

@johnny-schmidt johnny-schmidt requested a review from a team as a code owner May 23, 2025 17:44
@vercel
Copy link

vercel bot commented May 23, 2025

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
airbyte-docs ✅ Ready (Inspect) Visit Preview 💬 Add feedback May 27, 2025 11:18pm

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with with

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should some logging be added to indicate that the checkpoint was indeed removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log line log.info { "Flushing global checkpoint with key ${head.key}" } means we definitely got there. (I was careful to make sure all the possible code paths when trying to flush log something.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I meant logging globalCheckpoints.remove was done.
sendStateMessage could fail in which case we would not be running remove?

streamCheckpoints.poll() // don't remove until after we've successfully sent
sendStateMessage(nextMessage, nextCheckpointKey, listOf(stream.descriptor))
// don't remove until after we've successfully sent
streamCheckpoints.remove(nextCheckpointKey)
Copy link
Contributor

@frifriSF59 frifriSF59 May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we name nextCheckpointKey something like currentCheckpointKey?
I feel like it is confusing to call remove on "next"

@johnny-schmidt johnny-schmidt merged commit 6b91766 into master May 28, 2025
37 checks passed
@johnny-schmidt johnny-schmidt deleted the jschmidt/load-cdk/speed-state-handling-prep branch May 28, 2025 20:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants