Skip to content

Commit

Permalink
[FAB-1165] Use configured PartitionID
Browse files Browse the repository at this point in the history
Subtask of FAB-890

When publishing to Kafka we have been relying on the
topic only containing one partition (num.partitions = 1),
and the default PartitionID (0) in the config lining
up for a valid combination. This commit configures our
producer PartitionID configured.

Change-Id: Ibba476425a15aff9ff447afdfacbf4b77a41ce0c
Signed-off-by: Luis Sanchez <sanchezl@us.ibm.com>
  • Loading branch information
Luis Sanchez committed Nov 30, 2016
1 parent 7420a61 commit d69cd02
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 0 deletions.
21 changes: 21 additions & 0 deletions orderer/kafka/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
func newBrokerConfig(conf *config.TopLevel) *sarama.Config {
brokerConfig := sarama.NewConfig()
brokerConfig.Version = conf.Kafka.Version
brokerConfig.Producer.Partitioner = newStaticPartitioner(conf.Kafka.PartitionID)
return brokerConfig
}

Expand All @@ -50,3 +51,23 @@ func newOffsetReq(conf *config.TopLevel, seek int64) *sarama.OffsetRequest {
req.AddBlock(conf.Kafka.Topic, conf.Kafka.PartitionID, seek, 1)
return req
}

// newStaticPartitioner returns a PartitionerConstructor that returns a Partitioner
// that always chooses the specified partition.
func newStaticPartitioner(partition int32) sarama.PartitionerConstructor {
return func(topic string) sarama.Partitioner {
return &staticPartitioner{partition}
}
}

type staticPartitioner struct {
partitionID int32
}

func (p *staticPartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error) {
return p.partitionID, nil
}

func (p *staticPartitioner) RequiresConsistency() bool {
return true
}
84 changes: 84 additions & 0 deletions orderer/kafka/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kafka

import (
"testing"

"github.com/Shopify/sarama"
)

func TestStaticPartitioner(t *testing.T) {

var partition int32 = 3
var numberOfPartitions int32 = 6

partitionerConstructor := newStaticPartitioner(partition)
partitioner := partitionerConstructor(testConf.Kafka.Topic)

for i := 0; i < 10; i++ {
assignedPartition, err := partitioner.Partition(new(sarama.ProducerMessage), numberOfPartitions)
if err != nil {
t.Fatal(err)
}
if assignedPartition != partition {
t.Fatalf("Expected: %v. Actual: %v", partition, assignedPartition)
}
}
}

func TestNewBrokerConfig(t *testing.T) {

topic := testConf.Kafka.Topic

// use a partition id that is not the 'default' 0
var partition int32 = 2
originalPartitionID := testConf.Kafka.PartitionID
defer func() {
testConf.Kafka.PartitionID = originalPartitionID
}()
testConf.Kafka.PartitionID = partition

// setup a mock broker that reports that it has 3 partitions for the topic
broker := sarama.NewMockBroker(t, 1000)
broker.SetHandlerByMap(map[string]sarama.MockResponse{
"MetadataRequest": sarama.NewMockMetadataResponse(t).
SetBroker(broker.Addr(), broker.BrokerID()).
SetLeader(topic, 0, broker.BrokerID()).
SetLeader(topic, 1, broker.BrokerID()).
SetLeader(topic, 2, broker.BrokerID()),
"ProduceRequest": sarama.NewMockProduceResponse(t),
})

config := newBrokerConfig(testConf)
producer, err := sarama.NewSyncProducer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
assignedPartition, _, err := producer.SendMessage(&sarama.ProducerMessage{Topic: topic})
if err != nil {
t.Fatal(err)
}
if assignedPartition != partition {
t.Fatalf("Expected: %v. Actual: %v", partition, assignedPartition)
}
}
producer.Close()
broker.Close()
}

0 comments on commit d69cd02

Please sign in to comment.