Skip to content

Commit

Permalink
Merge pull request #115 from LucasDove/master
Browse files Browse the repository at this point in the history
produce messages partitioned by value
  • Loading branch information
fgeller committed Sep 9, 2023
2 parents ba48c15 + 21aba8e commit b09d29e
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 5 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
/kt
/quickfix
.idea/*
.vscode/*
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ go 1.21
require (
github.com/IBM/sarama v1.41.1
github.com/davecgh/go-spew v1.1.1
github.com/stretchr/testify v1.5.1
golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72
github.com/markusmobius/go-dateparser v1.2.1
github.com/stretchr/testify v1.8.4
golang.org/x/crypto v0.13.0
Expand Down
26 changes: 21 additions & 5 deletions produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (cmd *produceCmd) read(as []string) produceArgs {
flags.BoolVar(&args.literal, "literal", false, "Interpret stdin line literally and pass it as value, key as null.")
flags.StringVar(&args.version, "version", "", "Kafka protocol version")
flags.StringVar(&args.compression, "compression", "", "Kafka message compression codec [gzip|snappy|lz4] (defaults to none)")
flags.StringVar(&args.partitioner, "partitioner", "", "Optional partitioner to use. Available: hashCode")
flags.StringVar(&args.partitioner, "partitioner", "", "Optional partitioner to use. Available: hashCode, hashCodeByValue")
flags.StringVar(&args.decodeKey, "decodekey", "string", "Decode message value as (string|hex|base64), defaults to string.")
flags.StringVar(&args.decodeValue, "decodevalue", "string", "Decode message value as (string|hex|base64), defaults to string.")
flags.IntVar(&args.bufferSize, "buffersize", 16777216, "Buffer size for scanning stdin, defaults to 16777216=16*1024*1024.")
Expand Down Expand Up @@ -346,11 +346,16 @@ func (cmd *produceCmd) deserializeLines(in chan string, out chan message, partit
}

var part int32 = 0
if msg.Key != nil && cmd.partitioner == "hashCode" {
part = hashCodePartition(*msg.Key, partitionCount)
}
if msg.Partition == nil {
if msg.Value != nil && cmd.partitioner == "hashCodeByValue" {
part = hashCodePartition(*msg.Value, partitionCount)
msg.Partition = &part
}else {
if msg.Key != nil && cmd.partitioner == "hashCode" {
part = hashCodePartition(*msg.Key, partitionCount)
}
if msg.Partition == nil {
msg.Partition = &part
}
}

out <- msg
Expand Down Expand Up @@ -545,6 +550,12 @@ like the following:
In case the input line cannot be interpeted as a JSON object the key and value
both default to the input line and partition to 0.
If you don't want to specify key for single message, in other words, it doesn't matter that a message goes
to a random paritition (with equal probability), you can set the flag '-partitioner' with 'hashCodeByValue'.
That will tell kt to take the value of a message to calculate a hashcode deciding which paritition it will go to.
This can be helpful when you just want there are many messages distributed in partitions of a topic, and don't
care about what the content is.
Examples:
Send a single message with a specific key:
Expand All @@ -555,6 +566,11 @@ Send a single message with a specific key:
$ kt consume -topic greetings -timeout 1s -offsets 0:3-
{"partition":0,"offset":3,"key":"id-23","message":"ola"}
Send a single message without specified key:
$ echo 'no key specified message' | kt produce -topic greetings -partitioner hashCodeByValue
Sent message to a partition decided by your case
Keep reading input from stdin until interrupted (via ^C).
$ kt produce -topic greetings
Expand Down

0 comments on commit b09d29e

Please sign in to comment.