Skip to content

Conversation

@ParkMyCar
Copy link
Contributor

This PR changes the CdcStream::into_stream method use a streaming query so we no longer buffer an entire change set into memory and instead everything should be streamed in constant memory, with back pressure, from the network and to our caller.

It also removes mark_complete field on CdcEvent::Data, after chatting with Petros we'll handle cleaning up data in a slightly different way.

Motivation

Fixes a TODO(sql_server1)

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.

Copy link
Contributor

@martykulma martykulma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!!

pub fn query_streaming<'q>(
&mut self,
query: impl Into<Cow<'q, str>>,
pub fn query_streaming<'c, 'p, 'q, Q>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was lifetime 'p intended for params? It doesn't appear to be needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! removed

// TODO(sql_server3): Can we maybe re-use this BTreeMap or these Vec
// allocations? Something to be careful of is shrinking the allocations
// if/when they grow to large, e.g. from a large spike of changes.
let mut events: BTreeMap<Lsn, Vec<Operation>> = BTreeMap::default();
Copy link
Contributor

@martykulma martykulma Apr 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, could this keep track of a last_lsn and use a vec for accumulating the operations since we know the stream is ordered by lsn? It's much more bookkeeping, not sure it's worth the effort if chunk_size stays small.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea! Added this as a possible todo

@ParkMyCar ParkMyCar enabled auto-merge (squash) April 11, 2025 20:22
@ParkMyCar ParkMyCar merged commit 91dd92d into MaterializeInc:main Apr 11, 2025
82 checks passed
@ParkMyCar ParkMyCar deleted the sql_server/stream-changes branch April 12, 2025 15:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants