Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filters pushdown from view to source #3196

Open
bruwozniak opened this issue May 29, 2020 · 16 comments
Open

Filters pushdown from view to source #3196

bruwozniak opened this issue May 29, 2020 · 16 comments
Labels
C-bug Category: something is broken

Comments

@bruwozniak
Copy link

Hi, first of all I'm quite impressed with your project, keep up the great work.

I have a use case where I would like to analyze some data in a fairly large (~400GB) Kafka topic looking for specific criteria.

For example: I ran CREATE SOURCE [...]; and then CREATE MATERIALIZED VIEW xyz AS SELECT * FROM source WHERE id='abcd1234'; At this point Materialize starts reading the topic and OOMs after a few moments using 10GB of heap, presumably because there isn't enough ram to fit all source data in memory.
But of course technically you'd only need to keep the one row relevant to the view.

Can this behavior be optimized in the future?

@bruwozniak bruwozniak added the C-feature Category: new feature or request label May 29, 2020
@frankmcsherry
Copy link
Contributor

Hi! Thanks for the nice words.

We already do push down filters, so there is possibly something else going on. If you type

explain plan for SELECT * FROM source WHERE id='abcd1234'

you should get a visual presentation of the plan, which is certainly supposed to have a filter in place before the materialization.

Depending on when you most recently grabbed the binary, some work did recently unblock the flow of data from sources (which should have been noticeable only in cases of large batch sizes). Can you tell us which version you are using?

Thanks!

@rjnn
Copy link
Contributor

rjnn commented May 29, 2020

Also what OS are you using? On my Mac OS X laptop, it comfortably churns through 100s of gigs of input data while swapping out to disk - you should not have had an OOM error by 10g usage.

@bruwozniak
Copy link
Author

bruwozniak commented Jun 3, 2020

I'm using docker image in Kubernetes, version v0.2.3-rc1.
Here's the output of plan:

materialize=> explain plan for SELECT * FROM source WHERE id='abcd1234';
             Optimized Plan
-----------------------------------------
 %0 =                                   +
 | Get materialize.public.source (u1)+
 | Filter (#0 = "abcd1234")             +

(1 row)

Looks like the filter is after materializing?
EDIT: tried 0.3.0, output is the same. Created the view and Kubernetes killed it because of reaching the 10g limit. The Kafka topic has 60 partitions. I should note that with smaller topic (1 partition, 4GB of data) this works as expected in the sense that it does not reach the limit before materializing the view.

@bruwozniak
Copy link
Author

@frankmcsherry do you think the plan looks as expected?
If yes, what can be the issue with such high memory consumption?

@frankmcsherry
Copy link
Contributor

frankmcsherry commented Jun 4, 2020

Hi, sorry for the delay.

The plan looks totally normal, and the filtering will happen before the materialization. The materialize.public. prefix is just the namespace of the source.

It's a bit hard to know without additional information about your set-up. Can you describe the source, and the data it contains? You could also do a

create materialized view count(*) from source where id='abcd1234'

which should maintain a small footprint, and let you check out how many records are passing the filter.

It's definitely not expected that the filtering doesn't happen, and hasn't been an issue in other tests. I'd recommend grabbing the most recent release you can and double checking that; there has been some churn around Kafka reading and I can't say with high confidence that we haven't had an issue that could have done this and which might have been fixed.

@wangandi
Copy link
Contributor

wangandi commented Jun 4, 2020

@bruwozniak What is the source's envelope?

Filters are pushed down for ENVELOPE DEBEZIUM and ENVELOPE NONE, but filter pushdown has not been implemented yet for ENVELOPE UPSERT (but it is in our roadmap!). If you are using ENVELOPE UPSERT, the size of what is kept in memory will be proportional to the number of unique active keys in the kafka topic.

@bruwozniak
Copy link
Author

bruwozniak commented Jun 5, 2020

@wangandi I don't specify the envelope when creating the source so presumably it should be ENVELOPE NONE as per docs.

@frankmcsherry the data is a Kafka (2.3.0) topic with 60 well balanced partitions and Protobuf data.
The messages are small, but there are many of them (as I mentioned ~0.5TB). I should mention that we do not set Kafka keys when producing (only values).
I bumped Materialize to 0.3.0 but it has the same behaviour.
I tried to create your materialized view, but the issue is the same: it seems to try to load all data into memory.
Here's my create source:

CREATE SOURCE my_source FROM
KAFKA BROKER 'broker:9092' TOPIC 'my_topic'
FORMAT PROTOBUF MESSAGE 'MyMessage'
USING SCHEMA FILE '/etc/materialize/schema/message_descriptor';

This works fine when the source topic (same protobuf format) has just 1 partition and 1GB of data, presumably because they can all fit into memory before filter.

@frankmcsherry
Copy link
Contributor

Thanks for the additional info! I'm going to switch this from a feature request to a bug, as this is supposed to work (and .. seems to for our tests, but clearly we aren't testing something).

@frankmcsherry frankmcsherry added C-bug Category: something is broken and removed C-feature Category: new feature or request labels Jun 5, 2020
@frankmcsherry
Copy link
Contributor

Also, in the interest of keeping folks sane on our end: when you create an issue and select "bug" it asks for various bits of information; would you be willing to check out that list and try to grab those things (e.g. version, how you installed it, but also for bits of your logs).

@bruwozniak
Copy link
Author

What version of Materialize are you using?

$ materialized -v
materialized v0.3.0 (c9408416ed580b6faf12c605c4f230e3deafa407)

How did you install Materialize?

  • Docker image - on Kubernetes
  • Linux release tarball
  • APT package
  • macOS release tarball
  • Homebrew tap
  • Built from source

Main part of k8s deployment manifest:

      - image: materialize/materialized:v0.3.0
        name: materialize
        args:
          - --threads
          - '3'
        resources:
          requests:
            memory: 4Gi
          limits:
            memory: 10Gi

What was the issue?

See the thread above.

Is the issue reproducible? If so, please provide reproduction instructions.

Steps to reproduce:

  • Define protobuf message with at least one string field
  • Start a Kafka cluster (we have 4 brokers), create topic with 60 partitions
  • Produce a lot (min. several dozen GB) of message data into the topic, messages should not be keyed
  • Start materialize, create source pointing to the topic
  • Create materialized view from said source
  • OOM should happen

Please attach any applicable log files.

materialize Jun 05 11:58:05.339  INFO coord::catalog: create source materialize.public.test_source (u1)
materialize Jun 05 11:58:23.139  INFO coord::catalog: create view materialize.public.test_view (u2)
materialize Jun 05 11:58:23.139  INFO coord::catalog: create index materialize.public.test_view_primary_idx (u3)
materialize Jun 05 11:58:23.142  INFO dataflow::source::kafka: Refreshing Source Metadata for Source kafka-u1/u3 Partition Count: 1
materialize Jun 05 11:58:23.165  INFO coord::timestamp: Timestamping Source u1/u3 with Real Time Consistency. Max Timestamp Batch 0
materialize Jun 05 11:58:23.221  INFO dataflow::source::kafka: Refreshing Source Metadata for Source kafka-u1/u3 Partition Count: 35
materialize Jun 05 11:58:23.697  INFO dataflow::source::kafka: Refreshing Source Metadata for Source kafka-u1/u3 Partition Count: 60

@frankmcsherry
Copy link
Contributor

Thanks very much!

@wangandi
Copy link
Contributor

wangandi commented Jun 8, 2020

I did a quick test of memory usage on the same ~1 GB dataset, with a materialized view that filtered it down to 56 records:

  • With the data all in 1 partition: From a baseline memory usage of 506 MB, memory usage gradually increased to 739 MB
  • With the data spread across 30 partitions: From a baseline memory usage of 416 MB, memory spiked up to 3.47 GB before decreasing back down to 750 MB.

In both cases, checking mz_perf_arrangements_records revealed that the final arrangement contained only the 56 records.

Based on this, I suspect that the filter is there and doing its job, but increasing the number of partitions seem to result in a lot of extra memory allocations at the outset.

This works fine when the source topic (same protobuf format) has just 1 partition and 1GB of data, presumably because they can all fit into memory before filter.

@bruwozniak In the 1 partition 1 GB of data case, what does the memory usage graph look like? We have instructions on obtaining performance metrics here: https://materialize.io/docs/monitoring/, though it would be sufficient if you knew the peak memory usage and steady state memory usage after the making the view.

@bruwozniak
Copy link
Author

@wangandi I think the usage is similar to what you observed (peak below 4GB). And as I see you can reproduce this yourself too. But this is not the problematic scenario. Maybe you can try with 10GB data spread across 60 partitions and see what the memory profile looks like?

@wangandi
Copy link
Contributor

wangandi commented Jun 9, 2020

@bruwozniak
I'm asking about the 1 partition 1 GB of data case because you had mentioned this about the case:

This works fine when the source topic (same protobuf format) has just 1 partition and 1GB of data, presumably because they can all fit into memory before filter.

Thus, I was wondering, in the 1 partition 1 GB case, did you see a significant memory spike that made you believe the entire dataset was being loaded into memory before the filter was applied? Because in my quick test on 1 partition 1 GB, I only observed a gradual, monotonic increase in memory, and the total amount of increase was only 20% of the size of the dataset.

If you did see a significant memory spike in the 1 partition 1 GB case, that would indicate that there is an additional factor (such as size of average record, protobuf decoding) besides # of partitions/size of topic that we should be investigating as a cause of memory spiking.

@bruwozniak
Copy link
Author

Here's the memory usage for 1 topic/1GB case, idle -> creating mview.
image

That does not solve the issue of 60 partitions and many GBs though.

@bruwozniak
Copy link
Author

@wangandi I see that you opened a feature issue related to this, but I still believe this is a performance bug. What I am trying to use Materialize for, using the production topic, likely won't fit into e.g. 64GB of server RAM, either. So it's not just about being able to run heavy workloads on a laptop.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C-bug Category: something is broken
Projects
None yet
Development

No branches or pull requests

5 participants