Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support single-threaded reading of multiple Kafka partitions #2214

Merged
merged 1 commit into from Mar 12, 2020

Conversation

@nacrooks
Copy link
Contributor

nacrooks commented Mar 4, 2020

This PR addresses issue #2169. It:

  1. adds support for Kafka sources that contain multiple partitions

  2. adds test-drive support for testing with multiple partitions (by adding the num_partition=*partition=* options, where num_partition corresponds to the number of partitions and partition)

  3. adds test-drive support for adding partitions dynamically (by adding the "add_partition" keyword)

  4. it adds explicit error messages if the semantics of the consistency topic do not correspond to what the system expects. Specifically:

  • timestamps cannot be 0
  • timestamps should be monotonically increasing
  • the consistency topic should contain a single partition
  1. The consistency topic is now of the form SourceName-PartitionCount-PartitionId-Timestamp-Offset. Timestamps should be closed on individual partitions individually. For instance:
Foo,2,0,1,4
Foo,2,1,1,3
Foo,2,0,2,5
Foo,2,1,2,3

says that the first 4 records of Partition 0 and the first 3 records of Partition 1 will be timestamped with 1. The next record of partition 1 will be timestamped with ts=2, while partition 1 generates no record with timestamp 2.

  1. This PR adds support for dynamically adding partitions

  2. This PR adds support for empty streams (as well as testing)
    Foo,1,0,1,0
    will "fast-forward" the empty stream to timestamp 1

  3. This PR adds support for seeking to the appropriate position in a customer preventing us fro having to replay the full stream when a customer resets (addressing issue #2225 and #2254)

Limitations

This PR does not allow for concurrent reading of multiple partitions (a single Kafka consumer reads from all partitions).

If one partition is waiting on a timestamp (and the message is buffered), this message will cause other partitions to also block.

It does not handle "full streams" (where the stream hits the MAX_TIMESTAMP in u64) elegantly. It instead disallows that timestamp for BYO.

This PR does not address the underlying issue for why we see frequent resets in Kafka

@nacrooks nacrooks requested review from benesch and JLDLaughlin Mar 4, 2020
@cuongdo

This comment has been minimized.

Copy link
Member

cuongdo commented Mar 4, 2020

This PR does not allow partitions to be added automatically. Any message that comes from a partition that was not present when the source was created will be dropped.

Do we let the user know messages have been dropped?

@nacrooks

This comment has been minimized.

Copy link
Contributor Author

nacrooks commented Mar 4, 2020

Yes, with this message:


error!("Received a message from a partition that is not monitored. Partitions cannot\
                            currently be dynamically added. Partition: {}. Message is ignored ", partition);

The problem (according to @benesch) is that Kafka sometimes returns "out of date" information about the number of partitions. It is possible for the initial query to return a smaller number of partitions than what the user created the topic with. It is possible (in rare cases) for the user to see this message without ever "adding" partitions.

src/coord/timestamp.rs Show resolved Hide resolved
src/coord/timestamp.rs Outdated Show resolved Hide resolved
@benesch
benesch approved these changes Mar 5, 2020
Copy link
Member

benesch left a comment

🎉 I didn't review the dataflow/timestamper changes in great detail, but everything seemed to make sense as I skimmed. Left you a more detailed review of the testdrive stuff.

Thanks for turning this around so quickly!

src/testdrive/action/kafka.rs Outdated Show resolved Hide resolved
src/testdrive/action/kafka.rs Outdated Show resolved Hide resolved
src/testdrive/action/kafka.rs Outdated Show resolved Hide resolved
src/testdrive/action/kafka.rs Show resolved Hide resolved
src/coord/timestamp.rs Show resolved Hide resolved
@rjnn rjnn added this to the 0.2 milestone Mar 5, 2020
@nacrooks nacrooks force-pushed the nacrooks:multiplepartitions branch 2 times, most recently from db28d0a to 0a57824 Mar 10, 2020
@nacrooks nacrooks requested a review from frankmcsherry Mar 11, 2020
@nacrooks

This comment has been minimized.

Copy link
Contributor Author

nacrooks commented Mar 11, 2020

This PR does not allow partitions to be added automatically. Any message that comes from a partition that was not present when the source was created will be dropped.

Do we let the user know messages have been dropped?

This is now out-dated. We do support dynamically adding partitions.

@nacrooks nacrooks closed this Mar 11, 2020
@nacrooks nacrooks reopened this Mar 11, 2020
@nacrooks nacrooks force-pushed the nacrooks:multiplepartitions branch from d3aa8f7 to 3e24174 Mar 11, 2020
@nacrooks nacrooks self-assigned this Mar 11, 2020
Some(ts) => *ts,
None => 0,
};
if timestamp == 0

This comment has been minimized.

Copy link
@ruchirK

ruchirK Mar 12, 2020

Member

consider separating this if statement into several if statements each with its own associated error message, and separately I don't think you need an else branch because taking the conditional takes you out of the function (I might be misreading here if so apologies)

This comment has been minimized.

Copy link
@ruchirK

ruchirK Mar 12, 2020

Member

I like to avoid the else to avoid having another nested scope (this is partly scar tissue / ptsd from C where tabs are 8 spaces :) )

@nacrooks nacrooks force-pushed the nacrooks:multiplepartitions branch from 64b17c9 to b7ad345 Mar 12, 2020
Fixing initialisation condition of sources

Fixing Clippy Error

Adding support for dynamically adding partitions

Adding support for dynamically adding Kafka partitions

Moving default offset to be 0 to support empty partitions

Adding test to support empty table

Rebasing Jessica's changes

Updating refresh rate for producers

Updating refresh rate for producers

setting offset

Readding seek call

Removing unnecessary logging

fixing clippy warnings

Fixing partition getting created before consistency topic

Fix clippy errors

Whitespace

Addressing UMax issue

Fixing lint
@nacrooks nacrooks force-pushed the nacrooks:multiplepartitions branch from b7ad345 to 0bbae2d Mar 12, 2020
@nacrooks nacrooks merged commit d81e082 into MaterializeInc:master Mar 12, 2020
14 checks passed
14 checks passed
buildkite/tests Build #5903 passed (14 minutes, 7 seconds)
Details
buildkite/tests/bath-lint-and-rustfmt Passed (21 seconds)
Details
buildkite/tests/bulb-bulb-full-sql-logic-tests Passed (0 seconds)
Details
buildkite/tests/bulb-short-sql-logic-tests Passed (1 minute, 18 seconds)
Details
buildkite/tests/cargo-test Passed (48 seconds)
Details
buildkite/tests/docker-build Passed (10 minutes, 40 seconds)
Details
buildkite/tests/face-with-monocle-miri-test Passed (57 seconds)
Details
buildkite/tests/metabase-demo Passed (44 seconds)
Details
buildkite/tests/paperclip-clippy-and-doctests Passed (3 minutes, 55 seconds)
Details
buildkite/tests/pipeline Passed (15 seconds)
Details
buildkite/tests/racing-car-testdrive Passed (3 minutes, 4 seconds)
Details
buildkite/tests/shower-streaming-demo Passed (27 seconds)
Details
license/cla Contributor License Agreement is signed.
Details
netlify/materializeinc/deploy-preview Deploy preview canceled.
Details
benesch added a commit to benesch/materialize that referenced this pull request Mar 17, 2020
A Kafka producer locally caches metadata about what topics and
partitions are available. Attempting to produce to a partition that is
not present in the local cache will result in an UnknownPartition error.

The default topic metadata refresh interval is 5m, which means it could
take a full five minutes between when testdrive creates a new partition
and when it can produce to that partition. This is obviously untenable,
so the refresh interval was decreased to 10ms in MaterializeInc#2214.

Unfortunately, this had the side-effect of wedging any Kafka consumers.
Their metadata would always be out of date, and so they'd spend all
their time updating their metadata and never yield any records.

This patch restores topic.metadata.refresh.interval to its default,
fixing kafka-verify and restoring the test that used kafka-verify. Then,
to ensure that the Kafka producer has sufficiently fresh metadata, it
adjusts kafka-create-topic and kafka-add-partitions to forcibly refresh
the producer's metadata after the topic or partitions are created.

To prevent similar disasters in the future, there is no longer a Kafka
consumer that is shared across all the actions in a test file. Actions
that need a Kafka consumer (like kafka-verify) create their own on
demand. All metadata fetches will thus be forced through the Kafka
producer, where they will ensure the cache is up to date.

Fix MaterializeInc#2299.
benesch added a commit to benesch/materialize that referenced this pull request Mar 17, 2020
A Kafka producer locally caches metadata about what topics and
partitions are available. Attempting to produce to a partition that is
not present in the local cache will result in an UnknownPartition error.

The default topic metadata refresh interval is 5m, which means it could
take a full five minutes between when testdrive creates a new partition
and when it can produce to that partition. This is obviously untenable,
so the refresh interval was decreased to 10ms in MaterializeInc#2214.

Unfortunately, this had the side-effect of wedging any Kafka consumers.
Their metadata would always be out of date, and so they'd spend all
their time updating their metadata and never yield any records.

This patch restores topic.metadata.refresh.interval to its default,
fixing kafka-verify and restoring the test that used kafka-verify. Then,
to ensure that the Kafka producer has sufficiently fresh metadata, it
adjusts kafka-create-topic and kafka-add-partitions to forcibly refresh
the producer's metadata after the topic or partitions are created.

To prevent similar disasters in the future, there is no longer a Kafka
consumer that is shared across all the actions in a test file. Actions
that need a Kafka consumer (like kafka-verify) create their own on
demand. All metadata fetches will thus be forced through the Kafka
producer, where they will ensure the cache is up to date.

Fix MaterializeInc#2299.
benesch added a commit to benesch/materialize that referenced this pull request Mar 17, 2020
A Kafka producer locally caches metadata about what topics and
partitions are available. Attempting to produce to a partition that is
not present in the local cache will result in an UnknownPartition error.

The default topic metadata refresh interval is 5m, which means it could
take a full five minutes between when testdrive creates a new partition
and when it can produce to that partition. This is obviously untenable,
so the refresh interval was decreased to 10ms in MaterializeInc#2214.

Unfortunately, this had the side-effect of wedging any Kafka consumers.
Their metadata would always be out of date, and so they'd spend all
their time updating their metadata and never yield any records.

This patch restores topic.metadata.refresh.interval to its default,
fixing kafka-verify and restoring the test that used kafka-verify. Then,
to ensure that the Kafka producer has sufficiently fresh metadata, it
adjusts kafka-create-topic and kafka-add-partitions to forcibly refresh
the producer's metadata after the topic or partitions are created.

To prevent similar disasters in the future, there is no longer a Kafka
consumer that is shared across all the actions in a test file. Actions
that need a Kafka consumer (like kafka-verify) create their own on
demand. All metadata fetches will thus be forced through the Kafka
producer, where they will ensure the cache is up to date.

Fix MaterializeInc#2299.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

None yet

7 participants
You can’t perform that action at this time.