Skip to content

Commit

Permalink
[FAB-1365] Introduce Kafka container message types
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-1365

The revised Kafka consenter needs two special messages:
1. A time-to-cut message that is used to mark the end of a block, and
2. A no-op message that each shim posts when bootstrapped by the
multichain manager to prevent the possibility of "listening in" (seeking
and consuming) on a topic/partition that nobody has posted to yet [1].
This is an operation that panics in Kafka: "[ERROR] Cannot retrieve
required offset from Kafka cluster: kafka server: The request attempted
to perform an operation on an invalid topic."

These messages are special because they don't carry transactions, and
because the Kafka consenter will treat them in a special way: it will
ignore every time-to-cut message (for a specific block number) besides
the first one, and it will ignore all "no-op" messages when processing
incoming messages from the chain partition.

This changeset defines the types that will carry these messages, as well
as helper functions to generate them. Note that these are not hooked into
the main path yet, though a preview of these in action can be found here:
https://github.com/kchristidis/fabric/blob/47752ed61fcab1b26207a9e9075c1c793d723912/orderer/kafka/main.go#L142
https://github.com/kchristidis/fabric/blob/47752ed61fcab1b26207a9e9075c1c793d723912/orderer/kafka/main.go#L164
https://github.com/kchristidis/fabric/blob/47752ed61fcab1b26207a9e9075c1c793d723912/orderer/kafka/main.go#L204

These changes will be hooked into the main path in a follow-up changeset
that introduces the revised Kafka consenter.

[1] We ask the consenter to "listen in" on an empty topic/partition every
time a new chain is created, since we never actually post the genesis
block to that chain's partition.

Change-Id: Ic7ebbf2585e6e8e5080866e0d110d9cff5a16de5
Signed-off-by: Kostas Christidis <kostas@christidis.io>
  • Loading branch information
kchristidis committed Dec 15, 2016
1 parent 71a3389 commit a99e792
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 7 deletions.
31 changes: 31 additions & 0 deletions orderer/kafka/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kafka
import (
"github.com/Shopify/sarama"
"github.com/hyperledger/fabric/orderer/localconfig"
ab "github.com/hyperledger/fabric/protos/orderer"
)

const (
Expand All @@ -41,6 +42,36 @@ func newBrokerConfig(conf *config.TopLevel) *sarama.Config {
return brokerConfig
}

func newConnectMessage() *ab.KafkaMessage {
return &ab.KafkaMessage{
Type: &ab.KafkaMessage_Connect{
Connect: &ab.KafkaMessageConnect{
Payload: nil,
},
},
}
}

func newRegularMessage(payload []byte) *ab.KafkaMessage {
return &ab.KafkaMessage{
Type: &ab.KafkaMessage_Regular{
Regular: &ab.KafkaMessageRegular{
Payload: payload,
},
},
}
}

func newTimeToCutMessage(blockNumber uint64) *ab.KafkaMessage {
return &ab.KafkaMessage{
Type: &ab.KafkaMessage_TimeToCut{
TimeToCut: &ab.KafkaMessageTimeToCut{
BlockNumber: blockNumber,
},
},
}
}

func newMsg(payload []byte, topic string) *sarama.ProducerMessage {
return &sarama.ProducerMessage{
Topic: topic,
Expand Down
5 changes: 5 additions & 0 deletions protos/orderer/ab.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 17 additions & 7 deletions protos/orderer/configuration.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

223 changes: 223 additions & 0 deletions protos/orderer/kafka.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 41 additions & 0 deletions protos/orderer/kafka.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

syntax = "proto3";

option go_package = "github.com/hyperledger/fabric/protos/orderer";

package orderer;

message KafkaMessage {
oneof Type {
KafkaMessageRegular regular = 1;
KafkaMessageTimeToCut time_to_cut = 2;
KafkaMessageConnect connect = 3;
}
}

message KafkaMessageRegular {
bytes payload = 1;
}

message KafkaMessageTimeToCut {
uint64 block_number = 1;
}

message KafkaMessageConnect {
bytes payload = 1;
}

0 comments on commit a99e792

Please sign in to comment.