From 6460f61de89c85065234aa9208219281503a1985 Mon Sep 17 00:00:00 2001 From: Yuan Xie Date: Tue, 15 Sep 2020 18:57:12 +0800 Subject: [PATCH 1/3] produce messages partitioned by value --- produce.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/produce.go b/produce.go index d9620ad..abf2829 100644 --- a/produce.go +++ b/produce.go @@ -53,7 +53,7 @@ func (cmd *produceCmd) read(as []string) produceArgs { flags.BoolVar(&args.literal, "literal", false, "Interpret stdin line literally and pass it as value, key as null.") flags.StringVar(&args.version, "version", "", "Kafka protocol version") flags.StringVar(&args.compression, "compression", "", "Kafka message compression codec [gzip|snappy|lz4] (defaults to none)") - flags.StringVar(&args.partitioner, "partitioner", "", "Optional partitioner to use. Available: hashCode") + flags.StringVar(&args.partitioner, "partitioner", "", "Optional partitioner to use. Available: hashCode, hashCodeByValue") flags.StringVar(&args.decodeKey, "decodekey", "string", "Decode message value as (string|hex|base64), defaults to string.") flags.StringVar(&args.decodeValue, "decodevalue", "string", "Decode message value as (string|hex|base64), defaults to string.") flags.IntVar(&args.bufferSize, "buffersize", 16777216, "Buffer size for scanning stdin, defaults to 16777216=16*1024*1024.") @@ -320,11 +320,16 @@ func (cmd *produceCmd) deserializeLines(in chan string, out chan message, partit } var part int32 = 0 - if msg.Key != nil && cmd.partitioner == "hashCode" { - part = hashCodePartition(*msg.Key, partitionCount) - } - if msg.Partition == nil { + if msg.Value != nil && cmd.partitioner == "hashCodeByValue" { + part = hashCodePartition(*msg.Value, partitionCount) msg.Partition = &part + }else { + if msg.Key != nil && cmd.partitioner == "hashCode" { + part = hashCodePartition(*msg.Key, partitionCount) + } + if msg.Partition == nil { + msg.Partition = &part + } } out <- msg From 25aef0acb159ab47679ad06a752d5dd38087763b Mon Sep 17 00:00:00 2001 From: Yuan Xie Date: Tue, 15 Sep 2020 19:20:38 +0800 Subject: [PATCH 2/3] doc for hashCodeByValue --- .idea/$CACHE_FILE$ | 87 ++++++++++++++++++++++++++++++++++++++++++++++ .idea/.gitignore | 8 +++++ .idea/dictionaries | 6 ++++ .idea/kt.iml | 8 +++++ .idea/misc.xml | 4 +++ .idea/modules.xml | 8 +++++ .idea/vcs.xml | 6 ++++ go.mod | 3 -- produce.go | 11 ++++++ 9 files changed, 138 insertions(+), 3 deletions(-) create mode 100644 .idea/$CACHE_FILE$ create mode 100644 .idea/.gitignore create mode 100644 .idea/dictionaries create mode 100644 .idea/kt.iml create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml diff --git a/.idea/$CACHE_FILE$ b/.idea/$CACHE_FILE$ new file mode 100644 index 0000000..f7d81f2 --- /dev/null +++ b/.idea/$CACHE_FILE$ @@ -0,0 +1,87 @@ + + + + + + + + + AccessibilityHTML + + + Assignment issuesJavaScript + + + Bitwise operation issuesJavaScript + + + CSS + + + Code quality toolsJavaScript + + + Code style issuesGo + + + Control flow issuesJavaScript + + + DOM issuesJavaScript + + + EditorConfig + + + General + + + GeneralGo + + + GeneralJavaScript + + + Go + + + HTML + + + JavaScript + + + Naming conventionsJavaScript + + + Potentially confusing code constructsJavaScript + + + Probable bugsCSS + + + Probable bugsJavaScript + + + Python + + + RELAX NG + + + RegExp + + + SQL + + + XPath + + + XSLT + + + + + + \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..73f69e0 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml +# Editor-based HTTP Client requests +/httpRequests/ diff --git a/.idea/dictionaries b/.idea/dictionaries new file mode 100644 index 0000000..fbbecce --- /dev/null +++ b/.idea/dictionaries @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/kt.iml b/.idea/kt.iml new file mode 100644 index 0000000..c956989 --- /dev/null +++ b/.idea/kt.iml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..001b059 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..b5c824d --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/go.mod b/go.mod index a89e921..da1d155 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,6 @@ module github.com/fgeller/kt require ( github.com/Shopify/sarama v1.26.1 github.com/davecgh/go-spew v1.1.1 - github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect - github.com/eapache/queue v1.1.0 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/testify v1.5.1 golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72 ) diff --git a/produce.go b/produce.go index abf2829..b0ae1cb 100644 --- a/produce.go +++ b/produce.go @@ -524,6 +524,12 @@ like the following: In case the input line cannot be interpeted as a JSON object the key and value both default to the input line and partition to 0. +If you don't want to specify key for single message, in other words, it doesn't matter that a message goes +to a random paritition (with equal probability), you can set the flag '-partitioner' with 'hashCodeByValue'. +That will tell kt to take the value of a message to calculate a hashcode deciding which paritition it will go to. +This can be helpful when you just want there are many messages distributed in partitions of a topic, and don't +care about what the content is. + Examples: Send a single message with a specific key: @@ -534,6 +540,11 @@ Send a single message with a specific key: $ kt consume -topic greetings -timeout 1s -offsets 0:3- {"partition":0,"offset":3,"key":"id-23","message":"ola"} +Send a single message without specified key: + $ echo 'no key specified message' | kt produce -topic greetings -partitioner hashCodeByValue + Sent message to a partition decided by your case + + Keep reading input from stdin until interrupted (via ^C). $ kt produce -topic greetings From 81a3ff6f4a9103cb6efdcc2f69e35dde39be5f7b Mon Sep 17 00:00:00 2001 From: Yuan Xie Date: Thu, 17 Sep 2020 16:19:36 +0800 Subject: [PATCH 3/3] do not commit idea folder --- .gitignore | 2 ++ .idea/$CACHE_FILE$ | 87 ---------------------------------------------- .idea/.gitignore | 8 ----- .idea/dictionaries | 6 ---- .idea/kt.iml | 8 ----- .idea/misc.xml | 4 --- .idea/modules.xml | 8 ----- .idea/vcs.xml | 6 ---- 8 files changed, 2 insertions(+), 127 deletions(-) delete mode 100644 .idea/$CACHE_FILE$ delete mode 100644 .idea/.gitignore delete mode 100644 .idea/dictionaries delete mode 100644 .idea/kt.iml delete mode 100644 .idea/misc.xml delete mode 100644 .idea/modules.xml delete mode 100644 .idea/vcs.xml diff --git a/.gitignore b/.gitignore index 3fa5ddc..750beeb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ /kt /quickfix +.idea/* +.vscode/* \ No newline at end of file diff --git a/.idea/$CACHE_FILE$ b/.idea/$CACHE_FILE$ deleted file mode 100644 index f7d81f2..0000000 --- a/.idea/$CACHE_FILE$ +++ /dev/null @@ -1,87 +0,0 @@ - - - - - - - - - AccessibilityHTML - - - Assignment issuesJavaScript - - - Bitwise operation issuesJavaScript - - - CSS - - - Code quality toolsJavaScript - - - Code style issuesGo - - - Control flow issuesJavaScript - - - DOM issuesJavaScript - - - EditorConfig - - - General - - - GeneralGo - - - GeneralJavaScript - - - Go - - - HTML - - - JavaScript - - - Naming conventionsJavaScript - - - Potentially confusing code constructsJavaScript - - - Probable bugsCSS - - - Probable bugsJavaScript - - - Python - - - RELAX NG - - - RegExp - - - SQL - - - XPath - - - XSLT - - - - - - \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore deleted file mode 100644 index 73f69e0..0000000 --- a/.idea/.gitignore +++ /dev/null @@ -1,8 +0,0 @@ -# Default ignored files -/shelf/ -/workspace.xml -# Datasource local storage ignored files -/dataSources/ -/dataSources.local.xml -# Editor-based HTTP Client requests -/httpRequests/ diff --git a/.idea/dictionaries b/.idea/dictionaries deleted file mode 100644 index fbbecce..0000000 --- a/.idea/dictionaries +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/.idea/kt.iml b/.idea/kt.iml deleted file mode 100644 index c956989..0000000 --- a/.idea/kt.iml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml deleted file mode 100644 index 001b059..0000000 --- a/.idea/misc.xml +++ /dev/null @@ -1,4 +0,0 @@ - - - - \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml deleted file mode 100644 index b5c824d..0000000 --- a/.idea/modules.xml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml deleted file mode 100644 index 94a25f7..0000000 --- a/.idea/vcs.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file