Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

persist,sources: allow compaction on persisted sources #9981

Closed

Conversation

aljoscha
Copy link
Contributor

@aljoscha aljoscha commented Jan 10, 2022

The idea behind this is:

  • keep track of what compaction the coordinator allows in RenderState
  • received AllowCompaction commands update that compaction frontier
  • pass a Rc<RefCell<>>> to that frontier to an operator that also
    looks at its input frontier and allows compaction to the combination
    (aka "minimum") of those two

An alternative solution would be to invoke allow_compaction() upon
receipt of the AllowCompaction message. This would, however, require
to keep the involved persistence write handles in RenderState, and the
number and type of those handles will likely be different for different
combinations of physical source (think Kafka etc.) and envelopes.

With the chosen solution we only need to update the allowed frontier
centrally and the rendering code that then allows compaction can be
tailored to the rendered source.

Fixes #9508

Tips/Notes for reviewer

I didn't yet add tests, but I wanted to get the basic idea out for review. If you agree, I'd add unit tests.

Also, merging this PR by itself would be incorrect, because we currently don't consider the since when restarting sources. This would surface when merging this PR together with #9659, which adds a check that persistence can serve the required as_of/since. The solution for this is #9656, which calculates an as_of/since when (re-)starting.

Checklist

I would add unit tests, and trust the combination of this PR plus the aforementioned PRs that add as_of checks together with Philip's persistence tests to cover this change.

  • This PR has adequate test coverage / QA involvement has been duly considered.
  • [N/A] This PR adds a release note for any user-facing behavior changes.

This change is Reviewable

@danhhz
Copy link
Contributor

danhhz commented Jan 10, 2022

Approach seems good to me. It will come as no surprise that I like the bits for dealing with the different numbers of streams for each envelope is kept all in one place

@philip-stoev philip-stoev self-requested a review January 10, 2022 17:27
@philip-stoev
Copy link
Contributor

philip-stoev commented Jan 10, 2022

I would like to test this myself if that is OK with you. Please let me know when the time is right. In particular, I want to:

  • make sure we have tests with a larger and a smaller value for the --logical-compaction-window
  • make sure that this PR indeed is helpful in restraining the unbounded memory consumption that has been observed

EDIT: Please do add any unit tests you can think of!

@aljoscha
Copy link
Contributor Author

@philip-stoev I think this is ready for testing from you now. I merged all the prerequisite PRs.

make sure that this PR indeed is helpful in restraining the unbounded memory consumption that has been observed

I'm not sure this PR will do that, I think it should help reduce on-disk size, if there is actually data to compact. cc @ruchirK Do you think enabling compaction will also potentially help in-memory structures.

@philip-stoev
Copy link
Contributor

Item No 1. Recovery is substantially slower in this branch, even slower than with persistence disabled:

bin/mzcompose --find feature-benchmark run feature-benchmark  --other-tag=mzbuild-RN2OOBR65JW4MPYEXOR2QYZZLA7VI4XD --root-scenario=KafkaRecovery --this-options="--persistent-kafka-upsert-source" --other-options="--persistent-kafka-upsert-source"

produces:

NAME                      |    THIS     |    OTHER    |  Regression?  | 'THIS' is:
----------------------------------------------------------------------------------------------------
KafkaRecovery             |      10.749 |      28.222 |      no       | 2.6 times faster

In this case, OTHER is this branch and THIS is main. Also, the 2.6 thing does not do it justice, individual measurements up to 90 seconds have been reported in this branch.

@ruchirK
Copy link
Contributor

ruchirK commented Jan 12, 2022

Item No 1. Recovery is substantially slower in this branch, even slower than with persistence disabled:

That's a known issue, and the magnitude of the slowdown is also within range of whats expected. I observed a 5x slowdown by hardcoding allow_compaction calls in the seal operator. #9602 addresses this by handling compaction requests outside of the main loop, so ingest / recovery doesn't block on compaction.

@aljoscha
Copy link
Contributor Author

Item No 1. Recovery is substantially slower in this branch, even slower than with persistence disabled:

That's a known issue, and the magnitude of the slowdown is also within range of whats expected. I observed a 5x slowdown by hardcoding allow_compaction calls in the seal operator. #9602 addresses this by handling compaction requests outside of the main loop, so ingest / recovery doesn't block on compaction.

There was a bug in the PR that Philip was testing that could cause it to not actually restart from persisted data. But I'm also surprised that compaction wouldn't have an impact. A possible explanation is that the benchmark only measures restart time, and compaction is mostly an overhead to an already-running pipeline.

@ruchirK
Copy link
Contributor

ruchirK commented Jan 12, 2022

re: in memory structures -- yeah compaction should help keep the size of the in-memory cache down

@aljoscha aljoscha force-pushed the is-9508-enable-compaction branch 2 times, most recently from 1f2e1e2 to 8c2c1f6 Compare January 12, 2022 14:43
@danhhz
Copy link
Contributor

danhhz commented Jan 12, 2022

Wait @ruchirK why would compaction in the main thread affect recovery time so much? Do you think the listen command is getting stuck in the cmd queue or something?

@philip-stoev
Copy link
Contributor

Item No 2. The major regression on recovery, item 1, is now gone, but there is a substantial performance regression during ingestion. Here are the numbers against the latest revision of this branch. THIS is this branch, OTHER is main:

$ docker volume rm feature-benchmark_mzdata
feature-benchmark_mzdata
NAME                      |    THIS     |    OTHER    |  Regression?  | 'THIS' is:
----------------------------------------------------------------------------------------------------
KafkaRaw                  |       1.039 |       1.119 |      no       | 7.1 pct   faster
KafkaUpsert               |       1.236 |       1.204 |      no       | 2.6 pct   slower
KafkaUpsertUnique         |       1.932 |       1.397 |    !!YES!!    | 38.3 pct   slower
KafkaRecovery             |      11.611 |      10.197 |    !!YES!!    | 13.9 pct   slower

Is this to be expected?

@aljoscha
Copy link
Contributor Author

@philip-stoev That's what I would expect, yes. And, just making sure: KafkaRecovery only measures the time spent in recovery?

@ruchirK
Copy link
Contributor

ruchirK commented Jan 12, 2022

Ah sorry, when - when I read recovery I was actually thinking about ingestion. my bad!

@aljoscha
Copy link
Contributor Author

At any reviewers: I'm not yet super happy about the "subtract 1 from the seal frontier" to determine the compaction frontier bit. I'm not sure that would translate will to cases where frontiers are not Antichain<u64>. But I'm also not 100% that matters right now. 🤷‍♂️

@aljoscha
Copy link
Contributor Author

I'll split the misc fixes that this spawned off into a separate PR.

@philip-stoev
Copy link
Contributor

@philip-stoev That's what I would expect, yes. And, just making sure: KafkaRecovery only measures the time spent in recovery?

Yes, it only measures the recovery time, that is, the time between SELECT 1 returning 1 and SELECT COUNT(*) FROM source returning the expected complete record count

@danhhz
Copy link
Contributor

danhhz commented Jan 12, 2022

Ah sorry, when - when I read recovery I was actually thinking about ingestion. my bad!

👍 Just making sure you weren't doing a galaxy brain :)

there is a substantial performance regression during ingestion. ... Is this to be expected?

@philip-stoev Yeah, this regression should mostly or entirely go away with #9602. But this PR is necessary functionality and we need to merge it even if it there is a perf regression. (I'm actually surprised that the regression on ingestion is only 40% without #9602)

@danhhz
Copy link
Contributor

danhhz commented Jan 12, 2022

I'm not yet super happy about the "subtract 1 from the seal frontier" to determine the compaction frontier bit.

@aljoscha I haven't looked in detail yet... does this go away if we merge a more limited form of your other PR to allow the compaction frontier to be == the seal frontier?

@aljoscha
Copy link
Contributor Author

@dan Ah sorry, I should have mentioned this here: letting the compaction frontier go beyond the seal doesn't work: #9938 (comment). I do think I have a solution that I like, though.

@danhhz
Copy link
Contributor

danhhz commented Jan 12, 2022

(at) dan is someone else! XD

I saw that comment. It was talking about the more general >= and I was attempting above to ask about the == case

@aljoscha
Copy link
Contributor Author

Oh boy, sorry (at) dan!

I saw that comment. It was talking about the more general >= and I was attempting above to ask about the == case

It wont work. A seal for time t means that data with a timestamp < t is sealed, data with timestamp t is not sealed, meaning on restore we discard data with timestamp t. If we allowed compaction up to t (I'm writing t a lot...) that would mean we discard all data/timestamp bindings on restart. Which... doesn't seem too smart. 😅

((When I said beyond above I was using the definition that timely is also using (I hope I'm right here), which says any t' that is >= t is beyond t, because of how frontiers work.))

@ruchirK
Copy link
Contributor

ruchirK commented Jan 13, 2022

I'm going to review this PR after I've finished the release this afternoon! Sorry for the delay and feel free to merge without me if you need to!

@aljoscha
Copy link
Contributor Author

The first two commits are from #9938 and #10029, respectively.

@aljoscha
Copy link
Contributor Author

I rebased, not that the two prerequisite PRs have been merged.

Copy link
Contributor

@ruchirK ruchirK left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just read through the first commit so far, but want to make a comment over the whole commit:

I'm sorry to jump in at the 11th hour but holding onto a antichain for the compaction feels like the wrong abstraction layer here. It seems like we would rather have something like a MultiWriteHandle (perhaps MultiCompactionHandle or MultiMaintenanceHandle or something that's not tagged with specific key value types), which every source could then create for itself, and then we could have only instance of the allow_compaction operator that also takes in such a MultiCompactionHandle

I'm also happy to merge something that's more like this PR and iterate :)

Edit: my secondary motivation is that I feel really burned by all the bugs I wrote in the dataflow timestamp generations code and the allowed_compaction_frontiers hashtable feels a lot like that code.

@@ -138,6 +138,7 @@ pub fn serve(config: Config) -> Result<(Server, LocalClient), anyhow::Error> {
local_inputs: HashMap::new(),
ts_source_mapping: HashMap::new(),
ts_histories: HashMap::default(),
allowed_compaction_frontiers: HashMap::default(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're inserting into this hashmap but never deleting from this hashmap afaict

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

TimelyTimestamp::minimum(),
),
));
render_state.allowed_compaction_frontiers.insert(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as written this will overwrite the compaction frontier if someone creates two instances of the same source - not sure if we want to check if orig_id already exists in the hashmap / debug assert that the return value of insert is none. Perhaps an alternate answer is "multiple instances of the same source are borked anyway, so nothing better to do for now", which is fine but then maybe we should have a todo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding an assert. But yes, there is an open issue about preventing multiple rendered instances for the same persistent source.

@danhhz
Copy link
Contributor

danhhz commented Jan 14, 2022

I'm sorry to jump in at the 11th hour but holding onto a antichain for the compaction feels like the wrong abstraction layer here. It seems like we would rather have something like a MultiWriteHandle (perhaps MultiCompactionHandle or MultiMaintenanceHandle or something that's not tagged with specific key value types), which every source could then create for itself, and then we could have only instance of the allow_compaction operator that also takes in such a MultiCompactionHandle

Can you elaborate on this? I'm not sure I understand the idea

@aljoscha
Copy link
Contributor Author

I believe what Ruchir is suggesting is similar to a thing that we (I at least) have considered at some point in the past, before we went down the current route of doing sealing/compaction from an operator.

At least Dan and I discussed it in the context of how to deal with coordinating seals from potentially multiple operators.

It would work similar to the AntichainToken that coord uses. All the "stakeholders", that is participating operators, or workers, or whatnot get their clone of the AntichainToken and can advance it as they see fit. The combined frontier is tracked and when it advances something can trigger.

In the context of this PR, we could hold one AntichainToken that is advanced based on messages from the coordinator, and one (or several) are held by the rendered dataflows that advance them based on the seal frontier. And when everyone advances enough, we would advance the allowed compaction frontier.

Is that what you had in mind?

Copy link
Contributor

@ruchirK ruchirK left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok read everything! No further comments except for one question about the timestamp bindings thing, and a strong desire to move persisted sources away from the sqlite timestamps binding code for our sanity :) (not in this pr ofc!)

);
let lower = Antichain::from_elem(assigned_ts.0);
let upper = Antichain::new();
// NOTE: We use get_bindings_in_range() and not the seemingly better
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand what was happening before - as in, this bit in the commit message:

This could lead to the case
were we thought that there we already bindings that supersede the
bindings restored from persistence but there were not, and we would
therefore end up in a situation without any bindings, meaning the Kafka
source would re-read all its input from the beginning again.

is a lot of moving parts

it seems like

  1. if bindings from sqlite supersede bindings from persist we throw away bindings from persist?
  2. somehow we only use the bindings from persist to decide what data we already have?
  3. based on 2. we use the new proposed bindings and end up re-reading from kafka?

does that sound right? I didn't yet reread the rest of the surrounding code, but I can do that later. If there's any way to add a test for this bug that would be amazing. No worries if not though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do a call on Monday? I can walk you through it. I think it's valuable to properly think it through, no need to rush.

Side note: there is already a test, Philips persistence tests that use a Kafka source caught this. But it only surfaced once compaction was enabled, which I can also probably better explain in a call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love that! I don't want to take away from your skunkworks but I can do a call today if you'd rather move faster on this. I'm happy either way totally up to you

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do Monday 👌

@ruchirK
Copy link
Contributor

ruchirK commented Jan 14, 2022

No Aljoscha that's not quite what I had in mind. Let me noodle on this and get back to you guys in a bit. Just want to reiterate though that I don't mean for this to block merging!

@ruchirK
Copy link
Contributor

ruchirK commented Jan 14, 2022

so here's a prototype of the changes I'm thinking of 2fce819

this compiles but has not been further tested. the branch is here https://github.com/ruchirK/materialize/tree/ruchir-compaction-edits

the intuition here is that this paragraph from your description above Aljsocha:

An alternative solution would be to invoke allow_compaction() upon
receipt of the AllowCompaction message. This would, however, require
to keep the involved persistence write handles in RenderState, and the
number and type of those handles will likely be different for different
combinations of physical source (think Kafka etc.) and envelopes.

I think this paragraph has the right idea, and my commit introduces a new MultiCompactionHandle that lets you atomically compact a set of streams with varying key-val types. I still only call allow_compaction from worker 0 to sidestep the fact that we can't do this with multiworkers. I think the big wins here are that:

  1. this mirrors what we do for indexes now
  2. we don't need the allow_compaction operator at all

edit:

added a 3rd win: having the compactions be atomic is easier for me to reason about :)

@danhhz
Copy link
Contributor

danhhz commented Jan 14, 2022

Looking at Ruchir's branch makes me realize that having <K, V> bounds on MultiWriteHandle is a wart (one related to this single public exposure of what is supposed to be an internal-only Id). I think we'd want to do a mini-refactor to fix that instead of introducing a separate MultiCompactionHandle

@aljoscha
Copy link
Contributor Author

so here's a prototype of the changes I'm thinking of 2fce819

I think I'd like such a solution and my very first try actually did something like that! The reason I didn't go further is that the compaction frontier coming in from coord (in the guise of AllowCompaction) is usually to far ahead: the coordinator determines the compaction frontier based on the uppers that it gets from sources, and those uppers are derived from the timestamp bindings (c.f. [1]).

A rendered source will look sth like this:

source_ operator --> send uppers based on bindings
|
v
decode
|
v
do envelope-y stuff
|
v
persist
|
v
seal
|
v
the rest...

So the frontier up to which we seal always lags behind the reported uppers by a bit. And we cannot compact up to the seal frontier right now, because we then wouldn't have a way of figuring out what data we have to filter out when restoring. Then again, this problem would go away if we had a simpler approach to keeping the different involved collections consistent.

I do think that sources reporting their upper based on the timestamp histories is not correct and results, for example, in things like reported frontiers for CDCv2 sources being incorrect. (internal discussion: https://materializeinc.slack.com/archives/C01CFKM1QRF/p1642121571061000). Also, "table" sources don't report an upper (only their index does) and I don't know if some other sources types do report an upper by themselves.

The very correct way of fixing this would be to fix sources to report their upper differently, e.g. for persistent sources to report their upper based on how far they have sealed. Then we could use the AllowCompaction command from coord to do compaction. I didn't want to go down that route because it would have required many more changes. But I'm thinking that someone should do that, in the near future.

Without fixing how uppers are reported, if we want to compact right on receipt of AllowCompaction we would additionally have to keep track of the seal frontier somewhere and hold compaction back by that (minus some) as well.

[1]

for (id, history) in self.render_state.ts_histories.iter() {

@aljoscha
Copy link
Contributor Author

A sketch of the mentioned fix for upper reporting would be:

  • don't report uppers based on timestamp histories
  • store an AntichainToken (or the thing backing it) in RenderState for each source
  • insert an operator at the "end" of each source dataflow that just advances their sources token based on the input frontier
  • report that frontier instead of the uppers for sources

Might even be that that's the fix for the CDCv2 problem... 🤷‍♂️

@ruchirK
Copy link
Contributor

ruchirK commented Jan 14, 2022

I think the cdcv2 fix is to move cdcv2 to a SimpleSource variant that can assign timestamps based on the inline progress information that comes in via the cdcv2 envelope. It can't just be cdcv2 because cdcv2 reads progress info and mints its own timestamps based on progress updates, but i think its possible to split SimpleSource into SimpleSourceInventTimestamps and SimpleSource or something like that?

For persisted sources:

The reason I didn't go further is that the compaction frontier coming in from coord (in the guise of AllowCompaction) is usually to far ahead: the coordinator determines the compaction frontier based on the uppers that it gets from sources, and those uppers are derived from the timestamp bindings

this seems like a major bug that I dunno about you guys but I did not think through till just now. If I'm understanding things right - the dataflow server can listen to the source's upper with a ProbeHandle inserted just after the await_frontier operator (or maybe just before) and we don't actually need to make a new operator for this

I do think we should fix persisted source upper reporting because if the upper is wrong/ too far ahead then I don't have a good mental model to reason about how persisted sources behave under varying compaction windows anymore. Like, are we more likely to get strange answers with a compaction window of 1hr? I would even advocate to move persisted sources off of the whole "persist bindings to sqlite" bit entirely.

Looking at Ruchir's branch makes me realize that having <K, V> bounds on MultiWriteHandle is a wart (one related to this single public exposure of what is supposed to be an internal-only Id). I think we'd want to do a mini-refactor to fix that instead of introducing a separate MultiCompactionHandle

No major preferences here. I did the fastest thing to prototype.

Copy link
Contributor

@philip-stoev philip-stoev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so:

  1. In a separate PR, I made the persistnce test run with --logical-compaction off and with the default value. This way coverage should be maintained as it was before this PR as well as enabling compaction to actually kick in.

  2. I manually* tested the size of the on-disk data under various kafka upsert situations (all unique keys, all updates to existing keys, deletes, etc.) and the disk usage appears to be as expected: compaction kicks in to prevent unbounded growth in case the upsert keys are not unique; deletes cause the data to be removed from disk . 70M worth of UUID-named files remain even if all the keys are deleted, but those 70Mb appear static and are not being overwritten every 1 second, so I guess this is acceptable.

@philip-stoev
Copy link
Contributor

philip-stoev commented Jan 17, 2022

Item No 1 : I got this warning in the log after restarting MZ:

2022-01-17T09:33:50.061029Z ERROR persist::operators::stream: In allow_compaction(kafka-u1/29-upsert-state): invalid compaction less than trace since Antichain { elements: [1642412021000] }: Antichain { elements: [0] }
2022-01-17T09:33:50.061050Z ERROR persist::operators::stream: In allow_compaction(kafka-u1/29-timestamp-bindings): invalid compaction less than trace since Antichain { elements: [1642412021000] }: Antichain { elements: [0] }

Before restart, it was running this td file:

$ set count=10000000

$ set keyschema={
    "type": "record",
    "name": "Key",
    "fields": [
        {"name": "f1", "type": "long"}
    ]
  }

$ set schema={
        "type" : "record",
        "name" : "test",
        "fields" : [
            {"name":"f2", "type":"long"}
        ]
    }

$ kafka-create-topic topic=kafka-upsert
$ kafka-ingest format=avro topic=kafka-upsert key-format=avro key-schema=${keyschema} schema=${schema} publish=true repeat=1
{"f1": ${kafka-ingest.iteration}} {"f2": 1}

> DROP SOURCE IF EXISTS s1;

> /* A */ CREATE MATERIALIZED SOURCE s1
  FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-kafka-upsert-${testdrive.seed}'
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY '${testdrive.schema-registry-url}'
  ENVELOPE UPSERT;

$ kafka-ingest format=avro topic=kafka-upsert key-format=avro key-schema=${keyschema} schema=${schema} publish=true repeat=${count}
{"f1": ${kafka-ingest.iteration}} {"f2": 1}

$ kafka-ingest format=avro topic=kafka-upsert key-format=avro key-schema=${keyschema} schema=${schema} publish=true repeat=${count}
{"f1": ${kafka-ingest.iteration}}

> SELECT COUNT(*) = 0 FROM s1;
true

@aljoscha
Copy link
Contributor Author

Item No 1 : I got this warning in the log after restarting MZ:

That's expected for now, sorry I should have mentioned that. I was planning to get rid of it before I merge.

The idea behind this is:
 - keep track of what compaction the coordinator allows in `RenderState`
 - received `AllowCompaction` commands update that compaction frontier
 - pass a `Rc<RefCell<>>>` to that frontier to an operator that also
   looks at its input frontier and allows compaction to the combination
   (aka "minimum") of those two

An alternative solution would be to invoke `allow_compaction()` upon
receipt of the `AllowCompaction` message. This would, however, require
to keep the involved persistence write handles in `RenderState`, and the
number and type of those handles will likely be different for different
combinations of physical source (think Kafka etc.) and envelopes.

With the chosen solution we only need to update the allowed frontier
centrally and the rendering code that then allows compaction can be
tailored to the rendered source.

Fixes MaterializeInc#9508
@aljoscha
Copy link
Contributor Author

Note: this should NOT be merged before #10128

@aljoscha
Copy link
Contributor Author

Closed in favour of #10258

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

persist,dataflow: enable compaction for persisted timestamp bindings and data
4 participants