Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sinks should accept differential batches #2808

Closed
frankmcsherry opened this issue Apr 28, 2020 · 11 comments
Closed

Sinks should accept differential batches #2808

frankmcsherry opened this issue Apr 28, 2020 · 11 comments
Labels
A-dataflow Area: dataflow C-bug Category: something is broken T-memory Theme: memory

Comments

@frankmcsherry
Copy link
Contributor

frankmcsherry commented Apr 28, 2020

Our sinks compact data that comes at them, which means that they have access to batch structured data. In addition, when we sink materialized inputs we already have the batch structure. Sinks currently ask for Collection structured data, which requires flattening batches down to owned records, and can represent a substantial resource requirement for the sink.

We should re-write sinks so that they accept a stream of differential batches, rather than a Collection. The sink then needs to iterate through the contents of each batch, rather than through update tuples. The intended result is a reduction in memory use when batches already exist, and a reduction in latency and total work performed when sinking data from an existing arrangement.

cc @ruchirK

@frankmcsherry frankmcsherry added C-bug Category: something is broken A-dataflow Area: dataflow labels Apr 28, 2020
@ruchirK ruchirK self-assigned this Apr 29, 2020
@ruchirK
Copy link
Contributor

ruchirK commented Apr 29, 2020

So I took a first whack at this and ran into issues that will need a bit of a rethink. Summarizing my thoughts and questions for future context.

Currently, we handle the problem of sinks writing data "exactly once" per updates wrt restarts by writing to a new instance of the storage (kafka topic / file / etc) every time we restart Materialize. This leads us to a fairly reasonable expectation that two kafka topics that were written to for the same sink should present updates in a consistent, deterministic order. The product thesis is that it would be really weird from a user point of view if Materialize presented a change log of updates at some time T, and then another new log at time T' after a restart and the original log had sequences of updates that were not present in the new log.

I think its worth it to step back and ask:

  1. Does a sink correspond to a change log for users?
  2. Is this a reasonable expectation for us to have?

Our strategy to try to achieve this was to use a Exchange parallelization contract to route all data for a given sink to a timely worker, which would be responsible for writing it down to Kafka / underlying storage. Unfortunately, this does not actually preserve the change log in the way that we want because as the writer thread can receive data from its peers interleaved in a nondeterministic way (for records that correspond to the same timestamp I think but I am not sure).

TODO for me -- lets test this out empirically and try to see how nondeterministic this can get because we may have to set expectations with prospects using sinks sooner rather than later

A little bit orthogonally, in this issue we want to have sinks receive and write down structured batches of data rather than streams of records because we can use the batches we already have to prevent doing additional computation to convert them into a stream and batches have nice properties that make it easier to move to a world where Materialize can track how far it wrote to a sink and not have to rewrite from scratch to brand new storage (kafka topic / file / etc) after every restart.

Unfortunately, having sinks receive and write down structured batches is incompatible with our current approach of having all threads send their data to one writer because our batches are only currently safe for single threaded access. There's a couple of different engineering ways we could think about solving this, but I think before that we need to answer two product questions

  1. Does deterministic regeneration of sinks still matter even if we no longer rewrite to a new topic / file after every restart? Rephrased for simplicity: do sinks have to emit the same data in the same order every time or not?
  2. Does being able to write to a single partition Kafka topic for our sink (regardless of how many threads Materialize is using) matter to us?

I hope this is a clear summary please tell me if that's not the case

cc: @rjnn @frankmcsherry @benesch

@frankmcsherry
Copy link
Contributor Author

frankmcsherry commented Apr 29, 2020

Btw, I think we can get deterministic output at one worker, but with a more sophisticated sink operator. It would need to stash updates and consult progress information to know when it had received all the updates. If we want to do that we can, but it would be some new code to write.

@ruchirK
Copy link
Contributor

ruchirK commented Apr 30, 2020

The implementation that I'm thinking of to do that looks pretty close to timely's aggregate operator - is that roughly what you were thinking?

@frankmcsherry
Copy link
Contributor Author

Just to check, the thing you are "thinking to do" is a way to get deterministic output rather than a way to accept differential batches, is that right?

If so, that should be fine for now, but it will be a bit off of peak performance. It means logic for each distinct timestamp, rather than retiring batches at a time, which was one of the steps differential made up from timely. I think at the moment it is not a big deal, but our sources (and perhaps soon our sinks) are the only operators that have these limitations.

Imo the right answer is still parallel batch writing across workers. If the single-worker thing is here just to get determinism, we should feel free to relax that and just get determinism with multiple workers instead (perhaps the single-worker thing is here to make something else easy, like set-ups where we aren't allowed to use partitioned topics, maybe?).

@ruchirK
Copy link
Contributor

ruchirK commented Apr 30, 2020

Yes that's right.

Re: parallel writing and determinism I think there's 2 product questions:

  1. Do we want to:
  • a. guarantee deterministic writes
  • b. expose a WITH option for determinism in the CREATE SINK command
  • c. not guarantee deterministic writes?
  1. Do we want to:
  • a. force sink output to go to kafka topics w/ N_WORKER partitions
  • b. expose a WITH option in the CREATE SINK command for n_partitions
  • c. force sink output to go to single partition topics?

Our actual state is 1.c 2.c but we aspire to 1.a 2.c at the moment.

My understanding is that to get the performance benefits of writing down batches (not having to flatten data out to a collection) -- we need each thread to write independently, and to do that in a deterministic way we need 2.a?

On the other hand, I think if we don't care about determinism then we can achieve parallel batch writing within a single partition topic or a multipartition topic?

I'm not sure if this is an accurate description of the design space.

cc @benesch and @rjnn if you all have additional thoughts

@ruchirK
Copy link
Contributor

ruchirK commented Apr 30, 2020

Also Frank just so I'm clear, when you say "it will be a bit off from peak performance" do you mean that "it will be slightly off from peak performance" or do you mean "it will be far off from peak performance"? I read it as the latter but wanted to double check

@frankmcsherry
Copy link
Contributor Author

It would be far off in differential where we can do 1Mhz updates. I think we are running about 100hz at the moment in Materialized, so no problems at the moment.

@ruchirK
Copy link
Contributor

ruchirK commented Apr 30, 2020

@nacrooks brought up a point that I hadn't considered -- if we write to multiple partitions its very hard (not quite impossible but afaik no one does this) to read from the partitions in a deterministic way. so even if we try really hard to write things down deterministically, I'm not sure there will be any benefit

@ruchirK
Copy link
Contributor

ruchirK commented Apr 30, 2020

Strawman proposal: we stay at nondeterministic writes until a client asks us for deterministic writes, and we stay at a single partition kafka topic for now and in parallel we see if we can make writing down batches work w/ multiple threads writing to a single partition kafka topic in a nondeterministic, but still correct w/ respect to timestamps, way (I'm not entirely sure if thats possible)

@benesch
Copy link
Member

benesch commented May 1, 2020 via email

@frankmcsherry
Copy link
Contributor Author

Just returning to this with some thoughts:

I do think we'll want at least one code path that is "write to as many parts as there are workers", as this allows a more efficient, batch-driven implementation (one that avoids at least one copy of the data to sink it, and has fewer back-pressure issues as it reads out of shared batches at its own speed).

Do we want to:
a. guarantee deterministic writes
b. expose a WITH option for determinism in the CREATE SINK command
c. not guarantee deterministic writes?

We do want our output to be semantically deterministic, but I'm not too bothered if it is non-deterministically ordered, as long as that order doesn't affect the semantics. We might want to make it ordered so that folks can pick out duplicates more easily (e.g., ordered by (timestamp, data)).

I'm not sure we gain too much from non-determinism here other than a simpler sink operator (one that writes data as it arrives, rather than buffering it). If that's right, I think deterministic would be great.

Do we want to:
a. force sink output to go to kafka topics w/ N_WORKER partitions
b. expose a WITH option in the CREATE SINK command for n_partitions
c. force sink output to go to single partition topics?

I'm personally a fan of a N_WORKER partition by default, and WITH options to dial it around.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-dataflow Area: dataflow C-bug Category: something is broken T-memory Theme: memory
Projects
None yet
Development

No branches or pull requests

6 participants