Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

persist: compaction does not work correctly for certain (non uncommon) ingestion patterns #15093

Open
philip-stoev opened this issue Sep 30, 2022 · 19 comments
Labels
A-stability Theme: stability A-storage Area: storage C-bug Category: something is broken

Comments

@philip-stoev
Copy link
Contributor

philip-stoev commented Sep 30, 2022

What version of Materialize are you using?

1a7ceca86c5624e20100803f19f10bab8b49111e

How did you install Materialize?

Docker image

What is the issue?

If one performs repeated upsert updates over 100Mb worth of data, the total amount of stuff stored in S3 continues to grow and never goes down regardless of how much time it is given. In addition to a general inefficiency, this would be a problem for GPDR compliance.

This is also true if the source is left idle for a long time.

Relevant log output

No response

@philip-stoev philip-stoev added C-bug Category: something is broken C-triage labels Sep 30, 2022
@philip-stoev
Copy link
Contributor Author

To reproduce, clone the gh15093 branch from https://github.com/philip-stoev/materialize:

cd test/gh15093
./mzcompose run default

And let it complete. Then see that the s3 usage goes from:

pstoev@Ubuntu-2004-focal-64-minimal:~/philip-stoev-bounded-memory-usage/test/bounded-memory$ docker exec gh15093-localstack-1 awslocal s3api list-objects --bucket persist --output json --query "[sum(Contents[].Size), length(Contents[])]"
[
    20798549804,
    2267
]

to

pstoev@Ubuntu-2004-focal-64-minimal:~/philip-stoev-bounded-memory-usage/test/bounded-memory$ docker exec gh15093-localstack-1 awslocal s3api list-objects --bucket persist --output json --query "[sum(Contents[].Size), length(Contents[])]"
[
    25518173047,
    3377
]

to

[
    29734964854,
    3383
]

and does not ever come back down ... I gave it 1 hour

@philip-stoev
Copy link
Contributor Author

The size of the data in reality:

materialize=> select sum(length(f1)) from bounded_memory_source;
    sum    
-----------
 101000003
(1 row)

So 100Mb for real.

@philip-stoev philip-stoev changed the title persistence: S3 keys from upsert sources are not cleaned up persist: S3 keys from upsert sources are not cleaned up Sep 30, 2022
@philip-stoev
Copy link
Contributor Author

I tried a simpler workload that simply inserts some keys and then deletes them all. This is also not getting cleaned up. No extra mzcompose required, just put this in test/testdrive/ and run with the test/testdrive mzcompose.

Storaged will spin at 100% for some time before settling down. The blob storage however will settle at 2.7G for 10M messages, even though the source is now empty:

materialize=> select * from bounded_memory_view;
 count 
-------
     0
(1 row)

If I understand correctly, the old stuff may get removed eventually but this is contingent on receiving even more messages from the source. That however is not a given -- the source may now receive any more messages in the future, but needs to compact and clean up existing ones for GPDR compliance.

$ set key-schema={
    "type": "string"
  }

$ set value-schema={
        "type" : "record",
        "name" : "test",
        "fields" : [
            {"name":"f1", "type":"string"}
        ]
    }

$ kafka-create-topic topic=bounded-memory

$ kafka-ingest format=avro key-format=avro topic=bounded-memory schema=${value-schema} key-schema=${key-schema} repeat=10000000
"${kafka-ingest.iteration}" {"f1": "A1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"}

> CREATE CONNECTION IF NOT EXISTS csr_conn
  FOR CONFLUENT SCHEMA REGISTRY
  URL '${testdrive.schema-registry-url}';

> CREATE CONNECTION kafka_conn
  FOR KAFKA BROKER '${testdrive.kafka-addr}';

> CREATE SOURCE bounded_memory_source
  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-bounded-memory-${testdrive.seed}')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  ENVELOPE UPSERT

> CREATE MATERIALIZED VIEW bounded_memory_view AS SELECT COUNT(*) FROM bounded_memory_source;

$ kafka-ingest format=avro key-format=avro topic=bounded-memory schema=${value-schema} key-schema=${key-schema} repeat=10000000
"${kafka-ingest.iteration}"

@philip-stoev philip-stoev changed the title persist: S3 keys from upsert sources are not cleaned up persist: old blobs from upsert sources are not cleaned up Oct 3, 2022
@philip-stoev philip-stoev changed the title persist: old blobs from upsert sources are not cleaned up persist: old record versions from upsert sources are not cleaned up Oct 3, 2022
@nmeagan11
Copy link

CC'ing @hlburak due to the compliance issue.

@aljoscha
Copy link
Contributor

I believe I found the issue!

I created a model of Philips problematic ingestion behavior using pure differential-dataflow to see how Spine behaves: https://github.com/aljoscha/differential-dataflow/tree/compaction-repro. I also created another model that uses plain persist WriteHandle/ReadHandle to isolate it from the rest of our source ingestion pipeline: https://github.com/aljoscha/materialize/tree/gh15093.

The ingestion pattern, which is modeled after continually ingesting upserts for the same num_keys keys (for both models is):

  • ingest num_keys retractions (updates with diff = -1) for the additions that were emitted in the previous round (except for first round)
  • ingest num_keys additions (updates with diff = +1), the addition data is a tuple of (key, round)
  • for both models, we allow compaction up to the write frontier, that is we allow the maximal amount of compaction
  • below, num_keys == 1000

Results for Spine

As expected, DD/Spine is dealing with this just fine! We keep around num_keys * 3 updates at steady state, these are a) the additions from the previous round in one batch and, b) the retractions/additions from the current round in a second batch. For example, after round 5 our Spine looks like this:

batch: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [5] }, since: Antichain { elements: [5] } }, entries: 1000
batch: Description { lower: Antichain { elements: [5] }, upper: Antichain { elements: [6] }, since: Antichain { elements: [0] } }, entries: 2000
num_batches: 2, num_entries: 3000
consolidated updates.len: 1000

The updates would be consolidated down to 1000 (that is num_keys) but compaction hasn't happened yet. In each round, when inserting new updates Spine first merges/consolidates the previous two batches and then adds the new additions/retractions as a separate batch.

This is important for later: Spine eagerly does work (merging/consolidating) before adding a new batch of updates. Based on the number of updates that are ingested.

Results for persist

With persist, we do almost no compaction/consolidation as we ingest new data, so the number of updates kept in the shard keeps growing even though the updates, when consolidated, would reduce down to just num_keys == 1000. For example, after round 5, we have:

batch Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [5] }, since: Antichain { elements: [11] } }, len: 7000 
batch Description { lower: Antichain { elements: [5] }, upper: Antichain { elements: [6] }, since: Antichain { elements: [0] } }, len: 2000 
num_batches: 2, num_entries: 9000
consolidated updates.len: 1000 <-- added by me for clarity

Similar to Spine, the new batch has both the additions for the current round and the retractions for the previous round, so num_keys * 2, but the "older" batch has a lot more updates than it should have!

Diagnosis

The reason for this is in how persist tries to do compaction work asynchronously. This is different from how Spine does it eagerly, while blocking ingestion if necessary (see above). Persist will notice when batches need merging/compaction (same as DD/Spine), but instead of doing that work eagerly (blockingly) it will spin up an asynchronous task and add new updates right away. The internal data structure that is keeping track of batches is updated, to represent the merging of batches, but the updates in those batches itself are not yet eagerly merged. That's why you will see the same "shape" of batches for both models, but the updates contained within differ.

Asynchronous compaction works like this, roughly:

  1. we notice that batches need to be merged
  2. record merged batches in our Spine-like data structure
  3. set off an async task that merges the updates, the task contains a descriptions of what "shape" of batch the result of compaction can replace, when successful.
  4. [...]
  5. async compaction task succeeds!
  • try and see if the result of compaction can replace any of the batches we currently have. If yes, do it! If not, discard the result.
  • ☝️ this discarding is very important for how the bug works!
  1. continue other persist business ...

The bug: In our model, where we constantly churn on just two batches, and keep merging them together, the compaction results can almost never be applied, and we keep doing compaction work that we then throw away.

A non-fix

I changed our code to do compaction work eagerly, instead of spawning an async task. That leads to the persist model behaving the same as the DD/Spine model in terms how what batches we keep and how many updates they contain. I think that confirms the bug.

@aljoscha
Copy link
Contributor

@aljoscha aljoscha changed the title persist: old record versions from upsert sources are not cleaned up persist: compaction does not work correctly for certain (non uncommon) ingestion patterns Oct 10, 2022
@aljoscha
Copy link
Contributor

@philip-stoev I changed the title to reflect the findings more explicitly, I hope that's okay.

@philip-stoev
Copy link
Contributor Author

@aljoscha No problem. Now that I know the operation of the entire thing is very sensitive to the shape of the workload, once you have a fix out I will test with many more workload shapes then I initially envisioned.

@aljoscha
Copy link
Contributor

It should not be sensitive to the workload. Once we fix that, all workloads should just work! 🤞

@aljoscha
Copy link
Contributor

@danhhz and I talked briefly about this, a sketch for an immediate fix is:

  • applying the compaction result based no the batch boundaries (think lower and upper) is too conservative because the batch boundaries can change while compaction is going on
  • instead, we need to be bit smarter and look into batches when trying to apply a compaction result. Then we can replace some of the parts of a batch (if they all match the compaction request) with a compacted result

@pH14
Copy link
Contributor

pH14 commented Oct 17, 2022

#15375 and #15356 will land in this release and should improve compaction performance for upserts, though we don't have a benchmark to specifically measure this yet.

In other general compaction news, we've also merged in:

This still leaves us with several categories of work to continue on. The biggest ones that come to mind are: better handling of empty batches, fetching blobs from S3 in parallel (and eventually all blobs, not just the initial stage), and more comprehensive/structured timeouts.

@danhhz
Copy link
Contributor

danhhz commented Oct 21, 2022

another compaction perf fix #15575

@danhhz danhhz mentioned this issue Oct 24, 2022
13 tasks
@aljoscha aljoscha added the A-stability Theme: stability label Oct 25, 2022
@pH14
Copy link
Contributor

pH14 commented Oct 27, 2022

#15575 has been merged and is on prod and has massively reduced the time we're spending per compaction:
Screen Shot 2022-10-27 at 1 37 39 PM

This will help compaction keep pace with writes, and greatly improves the odds that our compaction results merge cleanly into state.

#15732 has additionally been merged but is not yet released. This one greatly improves efficacy as well, identifying more opportunities to compact and improves the odds compaction applying successfully.


With these changes, I tried running Philip's original test of gh15093. After running, we found

> docker exec gh15093-localstack-1 awslocal s3api list-objects --bucket persist --output json --query "[sum(Contents[].Size), length(Contents[])]"
[
    27528286539,
    799
]

And then 20 min later:

docker exec gh15093-localstack-1 awslocal s3api list-objects --bucket persist --output json --query "[sum(Contents[].Size), length(Contents[])]"
[
    27419162211,
    550
]

We're no longer leaking bytes!

The data didn't consolidate out particularly much, but eh... problem for another day (tomorrow? 😄). The amount we can logically compact is a function of the shard downgrading since, so that's no longer just the domain of persist.

@philip-stoev
Copy link
Contributor Author

This currently does not compact:

            > CREATE TABLE obj (f1 TEXT)
            > INSERT INTO obj SELECT generate_series::text || REPEAT('x', 1024) FROM generate_series(1, 1024)
            > DELETE FROM obj;

I gave it more than 1 hour but the reported consumption remains 2Mb:

materialize=> select size_bytes FROM mz_storage_usage WHERE collection_timestamp = ( SELECT MAX(collection_timestamp) FROM mz_storage_usage ) AND object_id = ( SELECT id FROM mz_objects WHERE name = 'obj' );
 size_bytes 
------------
 2174370

philip-stoev added a commit to philip-stoev/materialize that referenced this issue Dec 1, 2022
Create various database objects and check the storage usage
reported for them in the mz_storage_usage table.

Scenarios involving upsert and deletions have been disabled
due to MaterializeInc#15093
philip-stoev added a commit to philip-stoev/materialize that referenced this issue Dec 1, 2022
Create various database objects and check the storage usage
reported for them in the mz_storage_usage table.

Scenarios involving upsert and deletions have been disabled
due to MaterializeInc#15093

Relates to MaterializeInc/cloud#3737
@uce uce unassigned aljoscha and pH14 Dec 16, 2022
@danhhz
Copy link
Contributor

danhhz commented Dec 18, 2022

@aljoscha do you still have a copy of https://github.com/aljoscha/materialize/tree/gh15093 sitting around somewhere?

@philip-stoev
Copy link
Contributor Author

I guess you meant https://github.com/philip-stoev/materialize/tree/gh15093?

You can also use the test case from #15093 (comment) as well as the disabled parts of test/storage-usage/mzcompose.py, currently in main.

@danhhz
Copy link
Contributor

danhhz commented Dec 20, 2022

nope! turns out paul had a copy of aloscha's branch, which had a tighter repro of the same badness you were seeing https://github.com/MaterializeInc/materialize/compare/main...pH14:materialize:gh15093-aljoscha?expand=1

danhhz added a commit to danhhz/materialize that referenced this issue Dec 21, 2022
We were accidentally serializing all compaction work in the
PersistCompactionWorker task. The intent was to limit concurrency of the
actual work using a semaphore, enqueue requests in a channel leading
into that semaphore, and drop requests when the channel fills up (i.e.
both the semaphore and the channel are full). This commit changes the
impl to match that intent.

Touches MaterializeInc#15093
danhhz added a commit to danhhz/materialize that referenced this issue Dec 21, 2022
We were accidentally serializing all compaction work in the
PersistCompactionWorker task. The intent was to limit concurrency of the
actual work using a semaphore, enqueue requests in a channel leading
into that semaphore, and drop requests when the channel fills up (i.e.
both the semaphore and the channel are full). This commit changes the
impl to match that intent.

Touches MaterializeInc#15093
@danhhz
Copy link
Contributor

danhhz commented Dec 21, 2022

okay, aljsocha's sim is pretty happy on top of #16784, so moving on to philip's simpler testdrive repro #15093 (comment). i ran it but lopping one zero off the end to start (seems to strike a nice balance of quick iteration and fidelity to the original bugs)

once things settle into a state where the source is just appending empty batches, this is what our spine looks like

f1331534(m[0][1671657198000]665771+m[1671657198000][1671657218001]665771) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671657218001][1671657323001]0

ignore the format, it's something i've invented that only exists locally. the way to read this is that we're continually hitting the empty batch optimization and squashing stuff into sm[1671657218001][1671657323001]0. meanwhile, there's a giant merge fueling, with two fully compacted batches, but looking for 1331534 units of fuel. this pretty much confirms theres nothing we can do in the absence of something like ye olde --differential-idle-merge-effort, which is what I suspected. i have a pretty good idea for how to quickly hack that into persist, so gonna do that and see how it goes

@danhhz
Copy link
Contributor

danhhz commented Dec 21, 2022

so close! here you see the idle merge effort fueling the big batch and then it compacts! but when it compacts, we end up with 2,000,000 updates instead of everything consolidating out. I suspect the since isn't advanced far enough to allow them to consolidate

  f1000000(m[0][1671659071000]1094787+f[1671659071000][1671659088001]2/905213) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659106001]0
  f900000(m[0][1671659071000]1094787+f[1671659071000][1671659088001]2/905213) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659107001]0
  f800000(m[0][1671659071000]1094787+m[1671659071000][1671659088001]905213) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659108001]0
  f700000(m[0][1671659071000]1094787+m[1671659071000][1671659088001]905213) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659109001]0
  f600000(m[0][1671659071000]1094787+m[1671659071000][1671659088001]905213) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659110001]0
  f500000(m[0][1671659071000]1094787+m[1671659071000][1671659088001]905213) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659111001]0
  f400000(m[0][1671659071000]1094787+m[1671659071000][1671659088001]905213) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659112001]0
  f300000(m[0][1671659071000]1094787+m[1671659071000][1671659088001]905213) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659113001]0
  f200000(m[0][1671659071000]1094787+m[1671659071000][1671659088001]905213) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659114001]0
  f100000(m[0][1671659071000]1094787+m[1671659071000][1671659088001]905213) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659115001]0
  f0(m[0][1671659071000]1094787+m[1671659071000][1671659088001]905213) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659116001]0
storage-u3: 2022-12-21T21:45:16.267407Z  INFO mz_persist_client::internal::machine:   req sc5f04af6 [0][1671659088001] 2/2000000 281781140
  sf[0][1671659088001]2/2000000 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659117001]0
  sf[0][1671659088001]2/2000000 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659118001]0
  sf[0][1671659088001]2/2000000 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659119001]0
  sf[0][1671659088001]2/2000000 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659120001]0
  sf[0][1671659088001]2/2000000 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659121001]0
  sf[0][1671659088001]2/2000000 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659122001]0
  sf[0][1671659088001]2/2000000 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659123001]0
  sf[0][1671659088001]2/2000000 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659124001]0
  sf[0][1671659088001]2/2000000 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659125001]0
  sf[0][1671659088001]2/2000000 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659126001]0
  sf[0][1671659088001]2/2000000 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659127001]0
  sf[0][1671659088001]2/2000000 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659128001]0
  sf[0][1671659088001]2/2000000 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659129001]0
  sf[0][1671659088001]2/2000000 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659130001]0
  sf[0][1671659088001]2/2000000 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659131001]0
  sf[0][1671659088001]2/2000000 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659132001]0
  sf[0][1671659088001]2/2000000 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659133001]0
storage-u3: 2022-12-21T21:45:32.447263Z  INFO mz_persist_client::internal::machine:   res sc5f04af6 [0][1671659088001] AppliedExact 281781129B
  sm[0][1671659088001]2000000 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ sm[1671659088001][1671659134001]0

danhhz added a commit to danhhz/materialize that referenced this issue Feb 16, 2023
In differential dataflow, idle_merge_effort configures an arrangement to
introduce extra fuel on every operator invocation. This causes data to
continue compacting, even when new updates are not being introduced.

Persist doesn't have anything that directly corresponds to operator
invocations, but we do the best we can to match the spirit of
idle_merge_effort. Specifically, we introduce the additional fuel on
each compare_and_append call. In practice, compare_and_append gets
called on a regular cadence to advance the frontier, so even in no
updates are being added, we'll eventually compact things down. This does
mean, however, that tuning the constant involved will likely be quite
different.

Touches MaterializeInc#15093
danhhz added a commit to danhhz/materialize that referenced this issue Feb 16, 2023
In differential dataflow, idle_merge_effort configures an arrangement to
introduce extra fuel on every operator invocation. This causes data to
continue compacting, even when new updates are not being introduced.

Persist doesn't have anything that directly corresponds to operator
invocations, but we do the best we can to match the spirit of
idle_merge_effort. Specifically, we introduce the additional fuel on
each compare_and_append call. In practice, compare_and_append gets
called on a regular cadence to advance the frontier, so even in no
updates are being added, we'll eventually compact things down. This does
mean, however, that tuning the constant involved will likely be quite
different.

Touches MaterializeInc#15093
Touches MaterializeInc#16607
@danhhz danhhz removed their assignment Mar 23, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-stability Theme: stability A-storage Area: storage C-bug Category: something is broken
Projects
None yet
Development

No branches or pull requests

6 participants