Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3443 [Kafka Streams] added support for subscribing to topics via regex #1410

Closed
wants to merge 7 commits into from

Conversation

bbejeck
Copy link
Contributor

@bbejeck bbejeck commented May 20, 2016

No description provided.

@bbejeck bbejeck changed the title KAFKA-3443 added support for subscribing to topics via regex KAFKA-3443 [Kafka Streams] added support for subscribing to topics via regex May 20, 2016
* Create a {@link KStream} instance from the specified Pattern.
* The default deserializers specified in the config are used.
* <p>
* If multiple topics are specified there are nor ordering guaranteed for records from different topics.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "If multiple topics are matched by the specified pattern, the created stream will read data from all of them, and there is no ordering guarantee between records from different topics." Ditto below.

@guozhangwang
Copy link
Contributor

Thanks for your patch @bbejeck . One general comment:

In KafkaConsumer we are restricting users to exclusively use either topics or pattern subscription. But in KafkaStreams it makes less sense to require them doing so. What would be better is that, for example:

stream1 = builder.stream(..., "topic1");
stream2 = builder.stream(..., pattern2);
stream3 = builder.stream(..., pattern3);

Then the streams library should be able to subscribe to topic1 | [pattern2] | [pattern3] (it may not be the correct syntax, but just the idea) as a single pattern subscription in the underlying consumer, and also remember a finer mapping from topic | pattern to the de-/serializers used upon fetching data and putting them to the buffer.

@bbejeck
Copy link
Contributor Author

bbejeck commented May 21, 2016

Then the streams library should be able to subscribe to topic1 | [pattern2] | [pattern3]....

@guozhangwang.
Agreed, changes are underway. Thanks for the feedback.

@bbejeck
Copy link
Contributor Author

bbejeck commented May 29, 2016

@guozhangwang

Changes made per comments.

@bbejeck
Copy link
Contributor Author

bbejeck commented May 29, 2016

I don't think the changes from this PR are responsible for the build failure.

@bbejeck
Copy link
Contributor Author

bbejeck commented May 30, 2016

Again, I don't think the pushed changes are responsible for the failure. Tests for the streams project run fine locally, not sure what the error is from.

@guozhangwang
Copy link
Contributor

Hmm, this is quite interesting..

pure virtual method called
terminate called without an active exception
:streams:test FAILED

We have seen this failure before, which I believe is due to RocksDB instances not closed in time, and I thought it has been resolved in #1258, since then I have not seen it again.

@bbejeck I have not looked into the patch in details, but could you think of any chance that the changes could affects how we close and dispose any RocksDB objects?

@bbejeck
Copy link
Contributor Author

bbejeck commented Jun 1, 2016

Well that's certainly possible, but the tests run fine locally. I didn't think the changes would have caused the failures because the net effect is mostly the same as the original PR, I just took a different approach. I'll update and re-run the tests.

EDIT: I just updated/rebased and all the streams tests pass locally. I'm running on Mac OS X 10.10.5. I'll continue to look and see if I can find something.

@guozhangwang
Copy link
Contributor

@bbejeck I re-ran the unit test with your patch and it passed locally as well, so I think this may be due to some corner cases still unresolved from #1258 , will investigate further separately by enhancing some logging information.

this.keyDeserializer = keyDeserializer;
this.valDeserializer = valDeserializer;
this.subscriptionUpdates = subscriptionUpdates;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that we always pass the same subscriptionUpdates object to all the SourceNodeFactory, could we instead overload getTopics with getTopics(subscriptionUpdates.getUpdates()), and trigger either one of the functions based on whether it contains the pattern or not?

KAFKA-3735: Dispose all RocksObejcts upon completeness

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Roger Hoover, Eno Thereska, Ismael Juma

Closes apache#1411 from guozhangwang/K3735-dispose-rocksobject

MINOR: Specify keyalg RSA for SSL key generation

Author: Sriharsha Chintalapani <harsha@hortonworks.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes apache#1416 from harshach/ssl-doc-fix

KAFKA-3747; Close `RecordBatch.records` when append to batch fails

With this change, `test_producer_throughput` with message_size=10000, compression_type=snappy and a snappy buffer size of 32k can be executed in a heap of 192m in a local environment (768m is needed without this change).

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes apache#1418 from ijuma/kafka-3747-close-record-batch-when-append-fails

MINOR: Fix documentation table of contents and `BLOCK_ON_BUFFER_FULL_DOC`

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Gwen Shapira

Closes apache#1423 from ijuma/minor-doc-fixes

Minor: Fix ps command example in docs

Process grep command has been updated. Previous "ps  | grep server-1.properties"  command is showing nothing.

Author: Satendra Kumar <satendra@knoldus.com>

Reviewers: Gwen Shapira

Closes apache#1386 from satendrakumar06/patch-1

KAFKA-3683; Add file descriptor recommendation to ops guide

Adding sizing recommendations for file descriptors to the ops guide.

Author: Dustin Cote <dustin@confluent.io>
Author: Dustin Cote <dustin@dustins-mbp.attlocal.net>

Reviewers: Gwen Shapira

Closes apache#1353 from cotedm/KAFKA-3683 and squashes the following commits:

8120318 [Dustin Cote] Adding file descriptor sizing recommendations
0908aa9 [Dustin Cote] Merge https://github.com/apache/kafka into trunk
32315e4 [Dustin Cote] Merge branch 'trunk' of https://github.com/cotedm/kafka into trunk
13309ed [Dustin Cote] Update links for new consumer API
4dcffc1 [Dustin Cote] Update links for new consumer API

MINOR: Add virtual env to Kafka system test README.md

Author: Liquan Pei <liquanpei@gmail.com>

Reviewers: Gwen Shapira

Closes apache#1346 from Ishiihara/add-venv

MINOR: Removed 1/2 of the hardcoded sleeps in Streams

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes apache#1422 from enothereska/minor-integration-timeout2

KAFKA-3732: Add an auto accept option to kafka-acls.sh

Added a new argument to AclCommand: --yes. When set, automatically answer yes to prompts

Author: Mickael Maison <mickael.maison@gmail.com>

Reviewers: Gwen Shapira

Closes apache#1406 from mimaison/KAFKA-3732

KAFKA-3718; propagate all KafkaConfig __consumer_offsets configs to OffsetConfig instantiation

Kafka has two configurable compression codecs: the one used by the client (source codec) and the one finally used when storing into the log (target codec). The target codec defaults to KafkaConfig.compressionType and can be dynamically configured through zookeeper.

The GroupCoordinator appends group membership information into the __consumer_offsets topic by:
1. making a message with group membership information
2. making a MessageSet with the single message compressed with the source codec
3. doing a log.append on the MessageSet

Without this patch, KafkaConfig.offsetsTopicCompressionCodec doesn't get propagated to OffsetConfig instantiation, so GroupMetadataManager uses a source codec of NoCompressionCodec when making the MessageSet. Let's say we have enough group information such that the message formed exceeds KafkaConfig.messageMaxBytes before compression but would fall below the threshold after compression using our source codec. Even if we had dynamically configured __consumer_offsets with our favorite compression codec, the log.append will throw RecordTooLargeException during analyzeAndValidateMessageSet since the message was unexpectedly uncompressed instead of having been compressed with the source codec defined by KafkaConfig.offsetsTopicCompressionCodec.

Author: Onur Karaman <okaraman@linkedin.com>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes apache#1394 from onurkaraman/KAFKA-3718

Setting broker state as running after publishing to ZK

junrao

Currently, the broker state is set to running before it registers itself in ZooKeeper.  This is too early in the broker lifecycle.  If clients use the broker state as an indicator that the broker is ready to accept requests, they will get errors.  This change is to delay setting the broker state to running until it's registered in ZK.

Author: Roger Hoover <roger.hoover@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes apache#1426 from theduderog/broker-running-after-zk

MINOR: Use `--force` instead of `--yes` in `AclCommand`

To be consistent with `ConfigCommand` and `TopicCommand`.

No release includes this option yet, so we can simply change it.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Mickael Maison, Grant Henke

Closes apache#1430 from ijuma/use-force-instead-of-yes-in-acl-command and squashes the following commits:

bdf3a57 [Ismael Juma] Update `AclCommandTest`
78b8467 [Ismael Juma] Change variable name to `forceOpt`
0bb27af [Ismael Juma] Use `--force` instead of `--yes` in `AclCommand`

MINOR: Fix wrong comments

Author: Yukun Guo <gyk.net@gmail.com>

Reviewers: Gwen Shapira

Closes apache#1198 from gyk/fix-comment

KAFKA-3723: Cannot change size of schema cache for JSON converter

Author: Christian Posta <christian.posta@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes apache#1401 from christian-posta/ceposta-connect-class-cast-error

KAFKA-3710: MemoryOffsetBackingStore shutdown

ExecutorService needs to be shutdown on close, lest a zombie thread
prevent clean shutdown.

ewencp

Author: Peter Davis <peter.davis@expeditors.com>

Reviewers: Liquan Pei <liquanpei@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes apache#1383 from davispw/KAFKA-3710

MINOR: Delete unused code in FileStreamSourceTask

Author: leisore <leisore@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes apache#1433 from leisore/master

KAFKA-3749; fix "BOOSTRAP_SERVERS_DOC" typo

Author: manuzhang <owenzhang1990@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes apache#1420 from manuzhang/KAFKA-3749

MINOR: Fix tracing in KafkaApis.handle()

requestObj() returns null for the o.a.k.c.requests objects so use header() for these.

Once all the requests will have been replaced by o.a.k.c.requests objects, we should be able to clean that up, but in the meantime it's useful to trace both.

Author: Mickael Maison <mickael.maison@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes apache#1435 from mimaison/kafkaapis_trace

MINOR: Fix a couple of scaladoc typos

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes apache#1440 from vahidhashemian/typo06/fix_typos_in_code_comments

KAFKA-3682; ArrayIndexOutOfBoundsException thrown by

SkimpyOffsetMap.get() when full

Limited number of attempts to number of map slots after the internal
positionOf() goes into linear search mode.
Added unit test

Co-developed with mimaison

Author: edoardo <ecomar@uk.ibm.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes apache#1352 from edoardocomar/KAFKA-3682

KAFKA-3678: Removed sleep from streams integration tests

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes apache#1439 from enothereska/KAFKA-3678-timeouts1

KAFKA-3767; Add missing license to connect-test.properties

This address to https://issues.apache.org/jira/browse/KAFKA-3767.

Author: Sasaki Toru <sasakitoa@nttdata.co.jp>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes apache#1443 from sasakitoa/test_failure_no_license

KAFKA-3158; ConsumerGroupCommand should tell whether group is actually dead

This patch fix differentiates between when a consumer group is rebalancing or dead and reports the appropriate error message.

Author: Ishita Mandhan <imandha@us.ibm.com>

Reviewers: Vahid Hashemian <vahidhashemian@us.ibm.com>, Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes apache#1429 from imandhan/KAFKA-3158

KAFKA-3765; Kafka Code style corrections

Removed explicit returns, not needed parentheses, corrected variables, removed unused imports
Using isEmpty/nonEmpty  instead of size check, using head, flatmap instead of map-flatten

Author: Joshi <rekhajoshm@gmail.com>
Author: Rekha Joshi <rekhajoshm@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes apache#1442 from rekhajoshm/KAFKA-3765

MINOR: Remove synchronized as the tasks are executed sequentially

Author: Liquan Pei <liquanpei@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes apache#1441 from Ishiihara/remove-synchronized

MINOR: Avoid trace logging computation in `checkEnoughReplicasReachOffset`

`numAcks` is only used in the `trace` logging statement so it should be a `def` instead of a `val`. Also took the chance to improve the code and documentation a little.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes apache#1449 from ijuma/minor-avoid-trace-logging-computation-in-partition
@bbejeck
Copy link
Contributor Author

bbejeck commented Jun 2, 2016

@guozhangwang
made changes per review comments. Looks like my rebase/squash was unsuccessful let me know if you want me to close this PR and issue another one.

@guozhangwang
Copy link
Contributor

@bbejeck Could you rebase? Seems some other commits are squashed in this PR.

@@ -504,9 +593,18 @@ private void connectProcessorAndStateStore(String processorName, String stateSto
public Map<Integer, TopicsInfo> topicGroups(String applicationId) {
Map<Integer, TopicsInfo> topicGroups = new HashMap<>();


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add some comments explaining about why we need to update the nodeToSourceTopics here.

* @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
*/
public final TopologyBuilder addSource(String name, Pattern topicPattern) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One general question: I think it is possible that a single topic matches more than one pattern, for example (please ignore the syntax flaws):

stream1 = builder.stream("a*");
stream2 = builder.stream("ab*");

And it is still allowed in the current setting, right? In this case, in TopologyBuilder.build one topic could be mapped to multiple source nodes (so literally when a record for that topic is fetched during the processing, it should logically be passed to multiple source nodes). How that is handled in your patch?

Copy link
Contributor Author

@bbejeck bbejeck Jun 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One general question: I think it is possible that a single topic matches more than one pattern, for example (please ignore the syntax flaws):

@guozhangwang
-for this case I added a check that the topic has not already been added as a source topic (TopologyBuilder line 141). In the worse case of a course grained regex matching all topics of a finer grained regex, the source node with the finer grained regex would not be added to the ProcessorTopology, but all topics would still be processed by the source node with the course grained regex. Is this a valid condition?

EDIT: Turns out this was not a good approach. What was done instead was to track patterns by topic when constructing the topology and if a topic maps to more than one Pattern a TopologyBuilderException is thrown.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, thanks.

@guozhangwang
Copy link
Contributor

This new patch looks very promising. Thanks @bbejeck ! I left some more comments.

Could we add some integration tests (please see examples in streams/test/integration) to validate the case that, for example, with three source streams, two with pattern and one with concrete topic list, 1) the consumer subscription reflects all the matched topics, and when 2) some new topics are added matching the pattern, or 3) some existing topics are deleted matching the pattern, they are correctly reflected during the rebalance, i.e. stream partition assignor?

@bbejeck
Copy link
Contributor Author

bbejeck commented Jun 2, 2016

@guozhangwang
Thanks for the feedback. I'll get to work on this soon.

@bbejeck
Copy link
Contributor Author

bbejeck commented Jun 4, 2016

Could you rebase? Seems some other commits are squashed in this PR.

@guozhangwang
In my fourth commit I neglected to do a push -f after I had rebased the branch. Are you ok removing the 4th commit only?

@bbejeck
Copy link
Contributor Author

bbejeck commented Jun 4, 2016

@guozhangwang
Made all changes per comments.

@bbejeck
Copy link
Contributor Author

bbejeck commented Jun 4, 2016

Could we add some integration tests

@guozhangwang
With respect to the integration tests, I just need to migrate the ones I have locally to the branch.
I have questions regarding adding/deleting topics. Is the behavior expected to by dynamic? When I add a topic that matches the pattern I need to restart the stream to get the new topic picked up.
And deleting a topic while the stream is running causes the program to crash with NullPointerException @org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:257). Starting the stream again resets everything and the topic is dropped successfully.

EDIT: I'm looking into fixing these issues now to allow for dynamic behavior

EDIT 2: Fixed the NullPointerException when deleting a topic that was part of the KStreams (the NPE was occurring whether the topic was named or matched by regex). The partition assignment is removed for the current topics and a new "generation" is created, but the StreamPartitionsAssignor does not kick in to re-assign the partitions

Log from KafkaServer after topic delete:
[2016-06-04 21:18:36,556] INFO Deleted log for partition [topic-A,0] in /tmp/kafka-logs/topic-A-0. (kafka.log.LogManager) [2016-06-04 21:18:36,723] INFO [GroupCoordinator 0]: Preparing to restabilize group regex_id with old generation 4 (kafka.coordinator.GroupCoordinator) [2016-06-04 21:18:36,723] INFO [GroupCoordinator 0]: Stabilized group regex_id generation 5 (kafka.coordinator.GroupCoordinator) [2016-06-04 21:18:36,725] INFO [GroupCoordinator 0]: Assignment received from leader for group regex_id for generation 5 (kafka.coordinator.GroupCoordinator) [2016-06-04 21:23:36,851] INFO [GroupCoordinator 0]: Preparing to restabilize group regex_id with old generation 5 (kafka.coordinator.GroupCoordinator) [2016-06-04 21:23:36,851] INFO [GroupCoordinator 0]: Stabilized group regex_id generation 6 (kafka.coordinator.GroupCoordinator) [2016-06-04 21:23:36,853] INFO [GroupCoordinator 0]: Assignment received from leader for group regex_id for generation 6 (kafka.coordinator.GroupCoordinator) [2016-06-04 21:26:04,998] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)

Log from KStream after topic delete:
[2016-06-04 21:18:36,721] DEBUG Updated cluster metadata version 4 to Cluster(nodes = [oddball.home:9092 (id: 0 rack: null)], partitions = [Partition(topic = topic-1, partition = 0, leader = 0, replicas = [0,], isr = [0,], Partition(topic = topic-Z, partition = 0, leader = 0, replicas = [0,], isr = [0,], Partition(topic = topic-2, partition = 0, leader = 0, replicas = [0,], isr = [0,], Partition(topic = topic-B, partition = 0, leader = 0, replicas = [0,], isr = [0,], Partition(topic = topic-C, partition = 0, leader = 0, replicas = [0,], isr = [0,]]) (org.apache.kafka.clients.Metadata:181) [2016-06-04 21:18:36,721] INFO Revoking previously assigned partitions [topic-Z-0, topic-1-0, topic-2-0, topic-B-0, topic-C-0] for group regex_id (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280) [2016-06-04 21:18:36,721] INFO Removing a task 0_0 (org.apache.kafka.streams.processor.internals.StreamThread:614)

I'll continue to work this...

@bbejeck
Copy link
Contributor Author

bbejeck commented Jun 6, 2016

@guozhangwang
Added integration test for multiple patterns and concrete topic list, and handling for "overlapping" patterns.

@bbejeck
Copy link
Contributor Author

bbejeck commented Jun 6, 2016

I'm aware of the conflict and will resolve.

@bbejeck bbejeck force-pushed the KAFKA-3443_support_regex_topics branch from a41d335 to cdac4df Compare June 7, 2016 02:52
@bbejeck
Copy link
Contributor Author

bbejeck commented Jun 7, 2016

Closing this pull request and reopening a new one

@bbejeck bbejeck closed this Jun 7, 2016
asfgit pushed a commit that referenced this pull request Jun 16, 2016
This PR is the follow on to the closed PR #1410.

Author: bbejeck <bbejeck@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1477 from bbejeck/KAFKA-3443_streams_support_for_regex_sources
efeg pushed a commit to efeg/kafka that referenced this pull request May 29, 2024
@bbejeck bbejeck deleted the KAFKA-3443_support_regex_topics branch July 10, 2024 12:55
mumrah pushed a commit to mumrah/kafka that referenced this pull request Aug 14, 2024
…he#1410)

* As a result of using `DefaultStatePersister`, when the first poll request is made by a share consumer there is some latency to create the internal topic SGS and load the coordinator.
* This messes up the timing info specified in the poll requests. However, this should not be a problem in production.
* To counter this, this PR adds a `warmup` method which creates a test topic and produces a single record into it. Then a share consumer polls the topic and as a side effect causes creation of SGS and coordinator shard load.
* This helps in assertions on subsequent polls.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants