Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test/persist: add test that compaction is happening as expected #10541

Merged
merged 1 commit into from
Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 164 additions & 0 deletions test/persistence/compaction/compaction.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

#
# Smoke tests that verify compaction occurs as expected.
#

$ set-sql-timeout duration=30s

> CREATE MATERIALIZED VIEW most_recent_mz_metrics AS
SELECT * FROM
(SELECT DISTINCT metric FROM mz_metrics) grp,
LATERAL (
SELECT value FROM mz_metrics
WHERE metric = grp.metric
ORDER BY time DESC LIMIT 1)

# At the beginning there are no trace batches.
> SELECT value FROM most_recent_mz_metrics where metric = 'mz_persist_trace_blob_count'
value
-----
0

> CREATE TABLE compaction (f1 INTEGER);

> INSERT INTO compaction VALUES (1);

# The insert should make it into trace, and form 1 trace batch.
> SELECT value FROM most_recent_mz_metrics where metric = 'mz_persist_trace_blob_count'
value
-----
1

# Read after write to coerce different writes to happen at different timestamps.
> SELECT * FROM compaction
f1
----
1

> INSERT INTO compaction VALUES (2);

> SELECT * FROM compaction
f1
----
1
2

# The previous insert should get compacted into one batch with the one before it.
> SELECT value FROM most_recent_mz_metrics where metric = 'mz_persist_trace_blob_count'
value
-----
1

> INSERT INTO compaction VALUES (3);

> SELECT * FROM compaction
f1
----
1
2
3

# The most recent insert forms a new batch because it is at a lower compaction level.
> SELECT value FROM most_recent_mz_metrics where metric = 'mz_persist_trace_blob_count'
value
-----
2

> INSERT INTO compaction VALUES (4);

> SELECT * FROM compaction
f1
----
1
2
3
4

# The previous insert should compact together with the previous batch, which then compacts recursively
# up, leaving just one batch.
> SELECT value FROM most_recent_mz_metrics where metric = 'mz_persist_trace_blob_count'
value
-----
1

> INSERT INTO compaction VALUES (5);

> SELECT * FROM compaction
f1
----
1
2
3
4
5

# The previous insert forms a new batch.
> SELECT value FROM most_recent_mz_metrics where metric = 'mz_persist_trace_blob_count'
value
-----
2

> INSERT INTO compaction VALUES (6);

> SELECT * FROM compaction
f1
----
1
2
3
4
5
6

# The previous insert gets merged together with the one just before it.
> SELECT value FROM most_recent_mz_metrics where metric = 'mz_persist_trace_blob_count'
value
-----
2

> INSERT INTO compaction VALUES (7);

> SELECT * FROM compaction
f1
----
1
2
3
4
5
6
7

# The previous insert forms a new batch.
> SELECT value FROM most_recent_mz_metrics where metric = 'mz_persist_trace_blob_count'
value
-----
3

> INSERT INTO compaction VALUES (8);

> SELECT * FROM compaction
f1
----
1
2
3
4
5
6
7
8

# All of the previously inserted batches get compacted together into one batch.
> SELECT value FROM most_recent_mz_metrics where metric = 'mz_persist_trace_blob_count'
value
-----
1
19 changes: 19 additions & 0 deletions test/persistence/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
options=f"{mz_options} --disable-user-indexes",
)

mz_fast_metrics = Materialized(
options=f"{mz_options} --metrics-scraping-interval=1s",
)

prerequisites = ["zookeeper", "kafka", "schema-registry"]

SERVICES = [
Expand All @@ -54,6 +58,7 @@ def workflow_default(c: Composition) -> None:
workflow_failpoints(c)

workflow_disable_user_indexes(c)
workflow_compaction(c)


def workflow_kafka_sources(c: Composition) -> None:
Expand Down Expand Up @@ -149,3 +154,17 @@ def workflow_disable_user_indexes(c: Composition) -> None:
c.rm("materialized", "testdrive-svc", destroy_volumes=True)

c.rm_volumes("mzdata")


def workflow_compaction(c: Composition) -> None:
with c.override(mz_fast_metrics):
c.up("materialized")
c.wait_for_materialized()

c.run("testdrive-svc", "compaction/compaction.td")

c.kill("materialized")

c.rm("materialized", "testdrive-svc", destroy_volumes=True)

c.rm_volumes("mzdata")