-
Notifications
You must be signed in to change notification settings - Fork 458
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unbounded memory usage with LIMIT (TopK) on non-Debezium Kafka sources #5013
Comments
Thanks @pikulmar for the report! cc @frankmcsherry shouldnt #3987 have fixed this? It might also have to do with the sources that fail to propagate monotonicity information in which case also cc @elindsey. |
In principle yes, though in practice there are some quirks. The memory reduction relies on compaction, so a few things need to be true:
Also, there are other quirks like it spikes higher than I'd like myself. |
I can confirm that I was able to reproduce this error on my laptop with a fairly small topic, and I was able to make it go away with a logical compaction window of It's still a bit mysterious to me why the compaction window needs to be small, given that the retraction delay is 10 seconds? When I left things on the default settings for a few minutes, everything stayed uncompacted. Also @pikulmar if you have other memory related issues you should check out Materialize's dataflow graph memory utilization viewing page |
DD has the "issue" that if your data achieve a compacted physical representation, one batch for the entire collection, then it doesn't have another chance to do physical compaction, and doesn't grok the circumstances under which doing a speculative compaction would improve anything. So, if the compaction is 1min, and we read the data in 7s and within 20s total have received and retracted the feedback, we have 40s to do physical compaction before we even think of cancelling anything (the fed-back retractions are not at the same time; they are 10s later on). If within that 40s we arrive at a single physical batch, we are "stuck" until we receive another input. Just one more record would unstick it. |
Thank you all for the quick feed-back. Meanwhile, I found the time to follow up on this story. Summary below: Debugging Materialize issue #5013Test 1: JSON parsing followed by Top-KInitialize test environment: docker-compose --force-recreate up With # docker-compose.yml
# Based on
# https://raw.githubusercontent.com/bitnami/bitnami-docker-kafka/master/docker-compose.yml
version: '2'
services:
zookeeper:
image: 'bitnami/zookeeper:3'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka1:
image: 'bitnami/kafka:2'
environment:
- KAFKA_CFG_BROKER_ID=1
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
depends_on:
- zookeeper
kafka2:
image: 'bitnami/kafka:2'
environment:
- KAFKA_CFG_BROKER_ID=2
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
depends_on:
- zookeeper
kafka3:
image: 'bitnami/kafka:2'
environment:
- KAFKA_CFG_BROKER_ID=3
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
depends_on:
- zookeeper
materialize:
image: 'materialize/materialized:unstable-81f832b0818789c5e74183c208398431f14f079d'
# environment:
# # Enabled only for selected tests, as explained in text.
# - DIFFERENTIAL_EAGER_MERGE=1000
ports:
- 6875:6875
depends_on:
- kafka1
- kafka2
- kafka3 Test data are generated as follows: $ docker run --rm -it --network=materialize-monotonicity-test_default confluentinc/cp-kafkacat /bin/bash
$ openssl rand -base64 100000000 | kafkacat -P -b kafka1:9092 -t test -p 0 -k key
$ kafkacat -C -b kafka1:9092 -t test -e -o beginning | wc
% Reached end of topic test [0] at offset 2083334: exiting
2083334 2083334 135416670
$ kafkacat -C -b kafka1:9092 -t test -e -o beginning -J | kafkacat -P -b kafka1:9092 -t test-json -p 0 -k key
% Reached end of topic test [0] at offset 2083334: exiting
$ kafkacat -C -b kafka1:9092 -t test-json -e -o beginning | wc
% Reached end of topic test-json [0] at offset 2083334: exiting
2083334 2083334 361388966
$ kafkacat -C -b kafka1:9092 -t test-json -e -o beginning | tail -n 1
% Reached end of topic test-json [0] at offset 2083334: exiting
{"topic":"test","partition":0,"offset":2083333,"tstype":"create","ts":1608192273785,"key":"key","payload":"wcp7XWQ4DyDo0cJdEZctjQ=="} Thus, the Next, run DROP SOURCE IF EXISTS data_stream CASCADE;
CREATE SOURCE IF NOT EXISTS data_stream
FROM KAFKA BROKER 'kafka1:9092'
TOPIC 'test-json'
WITH (
ignore_source_keys=true
)
FORMAT BYTES;
DROP VIEW IF EXISTS test_view CASCADE;
CREATE MATERIALIZED VIEW test_view AS
SELECT
CONVERT_FROM(s.data, 'utf-8')::JSONB AS testcol
FROM data_stream AS s
ORDER BY s.data
LIMIT 1; Memory usage:
Following https://materialize.com/docs/ops/diagnosing-using-sql/#materialize-is-using-lots-of-memory-what-gives and running select mdo.id, mdo.name, sum(mas.records) as records, sum(mas.batches) as batches
from mz_arrangement_sizes as mas,
mz_dataflow_operators as mdo
where
mas.operator = mdo.id and
mas.worker = mdo.worker
group by mdo.id, mdo.name
order by sum(mas.records) desc; results in:
So there is some compaction happening, albeit it might not be perfect in that particular case. Following your suggestions, we tried adding one message to the input topic: $ kafkacat -C -b kafka1:9092 -t test -e -o beginning -J | head -n 1 | kafkacat -P -b kafka1:9092 -t test-json -p 0 -k key
% ERROR: Output write error: Broken pipe
$ kafkacat -C -b kafka1:9092 -t test-json -e -o beginning | wc
% Reached end of topic test-json [0] at offset 2083335: exiting
2083335 2083335 361389134 However, that did not change the overall memory usage. Variant: use
|
Virtual memory | Resident set | |
---|---|---|
peak | 14 GiB | 12 GiB |
after load | 14 GiB | 0.7 GiB |
Arrangement sizes:
id | name | records | batches
-----+----------------+---------+---------
328 | Arrange | 215 | 4
...
This is clearly much better.
Test 2: JSON parsing followed by jsonb_each
and Top-K
Since a comment on the GitHub issue suggested that the propagation of monotonicity information might be missing, we revisited our original query, which was more complex than the one reported in the GitHub issue and in fact made use of set-returning functions from the jsonb_*
family of functions. Let's therefore have a look at an example making use of jsonb_each
:
DROP SOURCE IF EXISTS data_stream CASCADE;
CREATE SOURCE IF NOT EXISTS data_stream
FROM KAFKA BROKER 'kafka1:9092'
TOPIC 'test-json'
WITH (
ignore_source_keys=true
)
FORMAT BYTES;
DROP VIEW IF EXISTS test_view CASCADE;
CREATE MATERIALIZED VIEW test_view AS
SELECT
each.key,
each.value
FROM
data_stream s,
jsonb_each(CONVERT_FROM(s.data, 'utf-8')::JSONB) each
ORDER BY each.value
LIMIT 1;
Memory usage:
Virtual memory | Resident set | |
---|---|---|
peak | 13 GiB | 11 GiB |
after load | 13 GiB | 10 GiB |
Arrangements:
id | name | records | batches
-----+----------------+---------+---------
556 | Arrange: TopK | 4168514 | 12
...
Note that the number of records is suspiciously close to twice the number of messages in the data stream.
Setting DIFFERENTIAL_EAGER_MERGE=1000
does not reduce memory usage significantly:
Virtual memory | Resident set | |
---|---|---|
peak | 20 GiB | 16 GiB |
after load | 20 GiB | 10 GiB |
Nor does it reduce the arrangement size:
id | name | records | batches
-----+----------------+---------+---------
556 | Arrange: TopK | 4168493 | 4
...
The graph returned by http://localhost:6875/memory
can be found here.
Observations
-
DIFFERENTIAL_EAGER_MERGE=1000
improves memory usage for queries resembling the one reported originally in this GitHub issue. Perhaps it would be helpful for future users to document this option more prominently (it is currently a bit hidden, at https://materialize.com/taking-materialize-for-a-spin-on-nyc-taxi-data/). Also, what is the meaning of the parameter value (1000
)? Is there some kind of trade-off associated with this setting? -
Virtual memory size appears to only ever go up, even if the resident set size decreases. Since Materialize uses system memory (document memory usage and best practices #2437 (comment)), this is unexpected. Also peak memory usage surges quite high, perhaps it would be good to have some configurable capping for this in the future.
-
On a related note (off topic), Kubernetes is a de-facto standard these days and does not easily support swap (Kubelet needs to allow configuration of container memory-swap kubernetes/kubernetes#7294). It might therefore be better if Materialize used e.g. memory-mapped files rather than over-committing memory (cf. document memory usage and best practices #2437 (comment)).
-
Back to the topic, our original issue appears to be that at least some of the set-returning
jsonb_*
functions do not propagate monotonicity information. At the same time, the ability to work with structured JSON data is part of the key features setting Materialize apart from, e.g., ksqlDB, for our application. We therefore hope that this issue could be corrected with moderate effort by someone familiar with the Materialize codebase.
Thanks @pikulmar for the very detailed and extremely helpful report. I'll let @frankmcsherry respond as to the exact semantics of differential eager merge but I think I see the issue with the monotonicity propagation for table valued functions and should have a fix for that very soon. I can verify from the linked graph that we are not rendering a monotonic topk in the view with jsonb_each |
Thanks for the detailed comments! Quick thoughts:
|
Thanks a lot for the super-quick and effective response @frankmcsherry ! Finally managed to have a coarser read through https://people.csail.mit.edu/malte/pub/drafts/2019-kpg.pdf. Although I am not an expert in that domain, I enjoyed the read and was also happy to see that we have apparently both spent some time at ETH Zurich. A few comments on your comments:
|
Thanks for the feedback. The allocator is probably jemalloc (depends a bit on the system; I think it varies from Linux to OSX), and I'll pass a pointer to these comments back to folks a bit more familiar. Traditionally the allocator and the system end up fighting a bit when we want to aggressively return memory to the system: we end up with disproportionate kernel CPU as memory is too often returned to and allocated from the kernel. Anyhow, it's something we can look into, and probably something that can be piloted with jemalloc env vars. |
We have an issue open about this, but a different answer to this is that on cold start we should possibly not assign distinct timestamps. The reason for the excessive memory is because the answers change so much, because on load we are assigning many distinct timestamps. Instead, we should be able to assign all data in the initial load the same timestamp, which should result in a peak memory utilization that is the same as the conclusion. Lemme find that issue / WIP PR and link them. |
Yes, that sounds like a promising approach indeed. Since we are at it, I hope it OK to ask two related questions:
|
|
|
This issue is almost certainly fixed (over the course of a few units of work). There is a lean implementation of |
What version of Materialize are you using?
How did you install Materialize?
What was the issue?
Using
TopK
(SQLLIMIT
) appears to be essential for defining bounded views on unbounded Kafka sources with append-only envelope (ENVELOPE NONE
). In practice, the memory usage is unexpectedly high, which suggests thatTopK
might not be exploiting the append-only nature of the Kafka source. Thus, the issue can be rephrased as "TopK push-down to append-only Kafka source" and is related to #3196, #777, #652, and #4216.Is the issue reproducible? If so, please provide reproduction instructions.
Yes. Given a Kafka topic
mytopic
with around 1M messages (~500 MiB total data volume), runThe increase in memory usage after submitting the above query and waiting a few minutes is roughly
2 GiB
, indicating that all data read from Kafka are retained in memory. However, only a single (!) row of data should be kept in memory after accounting for the fact that Kafka messages cannot be deleted (cf. append-only source).Please attach any applicable log files.
The text was updated successfully, but these errors were encountered: