Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

configurable kafka partitioner #2284

Merged
merged 6 commits into from Aug 24, 2016

Conversation

@urso
Copy link
Collaborator

commented Aug 16, 2016

  • Store kafka message wrapper in outputs.Data.Values for re-use on retry.
  • Add support for configuring the partitioner strategy
  • configurable support ignoring unavailable partitions
  • partitioning is mostly stable in error case (try to schedule events to same partition if returned to libbeat on error)
@urso

This comment has been minimized.

Copy link
Collaborator Author

commented Aug 17, 2016

Requires #2295 to fix test build

@urso urso force-pushed the urso:enh/kafka-partitioner branch 2 times, most recently from bb9c463 to 8fa7888 Aug 18, 2016

@urso urso changed the title [WIP] configurable kafka partitioner configurable kafka partitioner Aug 18, 2016

@urso urso added review and removed in progress labels Aug 18, 2016

@urso

This comment has been minimized.

Copy link
Collaborator Author

commented Aug 18, 2016

Will update docs in another PR, as a number of other settings have been added/changed for kafka output.

var testOptions = outputs.Options{}
func TestKafkaPublish(t *testing.T) {
single := modetest.SingleEvent
// multi := modetest.MultiEvent

This comment has been minimized.

Copy link
@ruflin

ruflin Aug 22, 2016

Collaborator

What is the idea with these 2 commented out lines?

This comment has been minimized.

Copy link
@urso

urso Aug 22, 2016

Author Collaborator

they can be removed as not being used. events of type []modetest.EventInfo hold a number of events + setting to bulk or not bulk publish those events. It's used to drive sending events through output plugin.

In test randMulti is used.

This comment has been minimized.

Copy link
@urso

urso Aug 22, 2016

Author Collaborator

removed

}

func initPartitionStrategy(
partition map[string]*common.Config,

This comment has been minimized.

Copy link
@ruflin

ruflin Aug 22, 2016

Collaborator

Do we need a map here as 0 or 1 partition are only allowed?

This comment has been minimized.

Copy link
@urso

urso Aug 22, 2016

Author Collaborator

partitioners are configured by name. the name in map[string] is for lookup into partitioners. We use this pattern in a few places with plugins being given by name. Alternate solution: capture *common.Config, get list of keys and check it's length is 1, extract Child config by key.

}
}

func makeFieldsHashPartitioner(fields []string, dropFail bool) partitioner {

This comment has been minimized.

Copy link
@ruflin

ruflin Aug 22, 2016

Collaborator

Could this have an affect on the output performance? If yes, should we add a note to the docs?

This comment has been minimized.

Copy link
@urso

urso Aug 22, 2016

Author Collaborator

Yes. No docs yet.

@ruflin

This comment has been minimized.

Copy link
Collaborator

commented Aug 22, 2016

LGTM. What about the updates to the config file? Will this also follow in a separate PR?

@urso

This comment has been minimized.

Copy link
Collaborator Author

commented Aug 22, 2016

config files can be done in this PR.

@urso urso force-pushed the urso:enh/kafka-partitioner branch from 8523bf6 to 86068dc Aug 22, 2016

@urso

This comment has been minimized.

Copy link
Collaborator Author

commented Aug 22, 2016

@ruflin added configs

@@ -35,6 +35,10 @@ packetbeat.protocols.amqp:
# the AMQP protocol by commenting out the list of ports.
ports: [5672]

packetbeat.protocols.cassandra:
#Cassandra port for traffic monitoring.

This comment has been minimized.

Copy link
@tsg

tsg Aug 22, 2016

Collaborator

It's not part of this PR, but needs a space in front of Cassandra.

This comment has been minimized.

Copy link
@urso

urso Aug 23, 2016

Author Collaborator

oh, this should be fixed by another PR. will have to rebase.

# If empty `output.kafka.key` setting will be used.
# Default value is empty list.
#hash: []

This comment has been minimized.

Copy link
@tsg

tsg Aug 22, 2016

Collaborator

What about the switch parameter? It's not clear to me what it does and was hoping to find an explanation here :)

This comment has been minimized.

Copy link
@urso

urso Aug 23, 2016

Author Collaborator

Here I only document the default partitioner. partition.hash is default strategy in libbeat and sarama. The switch parameter used by partition.random and partition.round_robin is some kind of 'bulking' parameter. That is, the 2 partitioners do select another partition only every N events. This parameter does not exist for the hashing partitioner.

This comment has been minimized.

Copy link
@urso

urso Aug 23, 2016

Author Collaborator

The other parameters will be documented in another PR updating docs about all kafka output settings.

@tsg

This comment has been minimized.

Copy link
Collaborator

commented Aug 22, 2016

LGTM, left a minor comment/question.

@andrewkroh

This comment has been minimized.

Copy link
Member

commented Aug 22, 2016

LGTM

urso added 2 commits Aug 10, 2016

@urso urso force-pushed the urso:enh/kafka-partitioner branch from 86068dc to 0a3a90c Aug 23, 2016

urso added 4 commits Aug 10, 2016
Configurable kafka partitioning
- add config settings to configure partition strategy
- add config settings to configure partitioner selecting only reachable
  partitions
- Store kafka message in data.values so original message including partition
  selection is still available on retry
- make kafka message key configurable
- update changelog
Introduce kafka table driven testing
- replace kafka integration tests with table driven integration tests
- add integrations tests using batch publishing
- add integration tests using different partitioner configurations

@urso urso force-pushed the urso:enh/kafka-partitioner branch from 0a3a90c to 3597027 Aug 23, 2016

@urso

This comment has been minimized.

Copy link
Collaborator Author

commented Aug 23, 2016

renamed switch to group_events.

@tsg tsg merged commit a025e36 into elastic:master Aug 24, 2016

4 checks passed

CLA Commit author is a member of Elasticsearch
Details
continuous-integration/appveyor/pr AppVeyor build succeeded
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
default Build finished.
Details

@urso urso deleted the urso:enh/kafka-partitioner branch Feb 19, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.