diff --git a/orderer/kafka/broadcast.go b/orderer/kafka/broadcast.go deleted file mode 100644 index 39354ffab65..00000000000 --- a/orderer/kafka/broadcast.go +++ /dev/null @@ -1,159 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "fmt" - "sync" - "time" - - "github.com/hyperledger/fabric/orderer/common/bootstrap/provisional" - "github.com/hyperledger/fabric/orderer/localconfig" - cb "github.com/hyperledger/fabric/protos/common" - ab "github.com/hyperledger/fabric/protos/orderer" - - "github.com/golang/protobuf/proto" -) - -// Broadcaster allows the caller to submit messages to the orderer -type Broadcaster interface { - Broadcast(stream ab.AtomicBroadcast_BroadcastServer) error - Closeable -} - -type broadcasterImpl struct { - producer Producer - config *config.TopLevel - once sync.Once - - batchChan chan *cb.Envelope - messages [][]byte - nextNumber uint64 - prevHash []byte -} - -func newBroadcaster(conf *config.TopLevel) Broadcaster { - b := &broadcasterImpl{ - producer: newProducer(conf), - config: conf, - batchChan: make(chan *cb.Envelope, conf.General.BatchSize), - messages: provisional.New(conf).GenesisBlock().GetData().Data, - nextNumber: 0, - } - - b.once.Do(func() { - // Send the genesis block to create the topic - // otherwise consumers will throw an exception. - b.sendBlock() - // Spawn the goroutine that cuts blocks - go b.cutBlock(b.config.General.BatchTimeout, b.config.General.BatchSize) - }) - - return b -} - -// Broadcast receives ordering requests by clients and sends back an -// acknowledgement for each received message in order, indicating -// success or type of failure -func (b *broadcasterImpl) Broadcast(stream ab.AtomicBroadcast_BroadcastServer) error { - return b.recvRequests(stream) -} - -// Close shuts down the broadcast side of the orderer -func (b *broadcasterImpl) Close() error { - if b.producer != nil { - return b.producer.Close() - } - return nil -} - -func (b *broadcasterImpl) sendBlock() error { - data := &cb.BlockData{ - Data: b.messages, - } - block := &cb.Block{ - Header: &cb.BlockHeader{ - Number: b.nextNumber, - PreviousHash: b.prevHash, - DataHash: data.Hash(), - }, - Data: data, - } - logger.Debugf("Prepared block %d with %d messages (%+v)", block.Header.Number, len(block.Data.Data), block) - - b.messages = [][]byte{} - b.nextNumber++ - b.prevHash = block.Header.Hash() - - blockBytes, err := proto.Marshal(block) - if err != nil { - logger.Fatalf("Error marshaling block: %s", err) - } - - return b.producer.Send(blockBytes) -} - -func (b *broadcasterImpl) cutBlock(period time.Duration, maxSize uint32) { - timer := time.NewTimer(period) - - for { - select { - case msg := <-b.batchChan: - data, err := proto.Marshal(msg) - if err != nil { - panic(fmt.Errorf("Error marshaling what should be a valid proto message: %s", err)) - } - b.messages = append(b.messages, data) - if len(b.messages) >= int(maxSize) { - if !timer.Stop() { - <-timer.C - } - timer.Reset(period) - if err := b.sendBlock(); err != nil { - panic(fmt.Errorf("Cannot communicate with Kafka broker: %s", err)) - } - } - case <-timer.C: - timer.Reset(period) - if len(b.messages) > 0 { - if err := b.sendBlock(); err != nil { - panic(fmt.Errorf("Cannot communicate with Kafka broker: %s", err)) - } - } - } - } -} - -func (b *broadcasterImpl) recvRequests(stream ab.AtomicBroadcast_BroadcastServer) error { - reply := new(ab.BroadcastResponse) - for { - msg, err := stream.Recv() - if err != nil { - logger.Debug("Can no longer receive requests from client (exited?)") - return err - } - - b.batchChan <- msg - reply.Status = cb.Status_SUCCESS // TODO This shouldn't always be a success - - if err := stream.Send(reply); err != nil { - logger.Info("Cannot send broadcast reply to client") - } - logger.Debugf("Sent broadcast reply %s to client", reply.Status.String()) - - } -} diff --git a/orderer/kafka/broadcast_mock_test.go b/orderer/kafka/broadcast_mock_test.go deleted file mode 100644 index 891f9916425..00000000000 --- a/orderer/kafka/broadcast_mock_test.go +++ /dev/null @@ -1,64 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "fmt" - "testing" - - "github.com/hyperledger/fabric/orderer/common/bootstrap/provisional" - "github.com/hyperledger/fabric/orderer/localconfig" - cb "github.com/hyperledger/fabric/protos/common" - - "github.com/golang/protobuf/proto" -) - -func mockNewBroadcaster(t *testing.T, conf *config.TopLevel, seek int64, disk chan []byte) Broadcaster { - genesisBlock := provisional.New(conf).GenesisBlock() - wait := make(chan struct{}) - - mb := &broadcasterImpl{ - producer: mockNewProducer(t, conf, seek, disk), - config: conf, - batchChan: make(chan *cb.Envelope, conf.General.BatchSize), - messages: genesisBlock.GetData().Data, - nextNumber: uint64(seek), - } - - go func() { - rxBlockBytes := <-disk - rxBlock := &cb.Block{} - if err := proto.Unmarshal(rxBlockBytes, rxBlock); err != nil { - panic(err) - } - if !proto.Equal(rxBlock.GetData(), genesisBlock.GetData()) { - panic(fmt.Errorf("Broadcaster not functioning as expected")) - } - close(wait) - }() - - mb.once.Do(func() { - // Send the genesis block to create the topic - // otherwise consumers will throw an exception. - mb.sendBlock() - // Spawn the goroutine that cuts blocks - go mb.cutBlock(mb.config.General.BatchTimeout, mb.config.General.BatchSize) - }) - <-wait - - return mb -} diff --git a/orderer/kafka/broadcast_test.go b/orderer/kafka/broadcast_test.go deleted file mode 100644 index 569249c15c5..00000000000 --- a/orderer/kafka/broadcast_test.go +++ /dev/null @@ -1,328 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "strconv" - "sync" - "testing" - "time" - - "github.com/golang/protobuf/proto" - cb "github.com/hyperledger/fabric/protos/common" -) - -func TestBroadcastResponse(t *testing.T) { - disk := make(chan []byte) - - mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk) - defer testClose(t, mb) - - mbs := newMockBroadcastStream(t) - go func() { - if err := mb.Broadcast(mbs); err != nil { - t.Fatal("Broadcast error:", err) - } - }() - - // Send a message to the orderer - go func() { - mbs.incoming <- &cb.Envelope{Payload: []byte("single message")} - }() - - for { - select { - case reply := <-mbs.outgoing: - if reply.Status != cb.Status_SUCCESS { - t.Fatal("Client should have received a SUCCESS reply") - } - return - case <-time.After(500 * time.Millisecond): - t.Fatal("Should have received a broadcast reply by the orderer by now") - } - } -} - -func TestBroadcastBatch(t *testing.T) { - disk := make(chan []byte) - - mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk) - defer testClose(t, mb) - - mbs := newMockBroadcastStream(t) - go func() { - if err := mb.Broadcast(mbs); err != nil { - t.Fatal("Broadcast error:", err) - } - }() - - // Pump a batch's worth of messages into the system - go func() { - for i := 0; i < int(testConf.General.BatchSize); i++ { - mbs.incoming <- &cb.Envelope{Payload: []byte("message " + strconv.Itoa(i))} - } - }() - - // Ignore the broadcast replies as they have been tested elsewhere - for i := 0; i < int(testConf.General.BatchSize); i++ { - <-mbs.outgoing - } - - for { - select { - case in := <-disk: - block := new(cb.Block) - err := proto.Unmarshal(in, block) - if err != nil { - t.Fatal("Expected a block on the broker's disk") - } - if len(block.Data.Data) != int(testConf.General.BatchSize) { - t.Fatalf("Expected block to have %d messages instead of %d", testConf.General.BatchSize, len(block.Data.Data)) - } - return - case <-time.After(500 * time.Millisecond): - t.Fatal("Should have received a block by now") - } - } -} - -// If the capacity of the response queue is less than the batch size, -// then if the response queue overflows, the order should not be able -// to send back a block to the client. (Sending replies and adding -// messages to the about-to-be-sent block happens on the same routine.) -/* func TestBroadcastResponseQueueOverflow(t *testing.T) { - - // Make sure that the response queue is less than the batch size - originalQueueSize := testConf.General.QueueSize - defer func() { testConf.General.QueueSize = originalQueueSize }() - testConf.General.QueueSize = testConf.General.BatchSize - 1 - - disk := make(chan []byte) - - mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk) - defer testClose(t, mb) - - mbs := newMockBroadcastStream(t) - go func() { - if err := mb.Broadcast(mbs); err != nil { - t.Fatal("Broadcast error:", err) - } - }() - - // Force the response queue to overflow by blocking the broadcast stream's Send() method - mbs.closed = true - defer func() { mbs.closed = false }() - - // Pump a batch's worth of messages into the system - go func() { - for i := 0; i < int(testConf.General.BatchSize); i++ { - mbs.incoming <- &cb.Envelope{Payload: []byte("message " + strconv.Itoa(i))} - } - }() - -loop: - for { - select { - case <-mbs.outgoing: - t.Fatal("Client shouldn't have received anything from the orderer") - case <-time.After(testConf.General.BatchTimeout + testTimePadding): - break loop // This is the success path - } - } -} */ - -func TestBroadcastIncompleteBatch(t *testing.T) { - if testConf.General.BatchSize <= 1 { - t.Skip("Skipping test as it requires a batchsize > 1") - } - - messageCount := int(testConf.General.BatchSize) - 1 - - disk := make(chan []byte) - - mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk) - defer testClose(t, mb) - - mbs := newMockBroadcastStream(t) - go func() { - if err := mb.Broadcast(mbs); err != nil { - t.Fatal("Broadcast error:", err) - } - }() - - // Pump less than batchSize messages into the system - go func() { - for i := 0; i < messageCount; i++ { - payload, _ := proto.Marshal(&cb.Payload{Data: []byte("message " + strconv.Itoa(i))}) - mbs.incoming <- &cb.Envelope{Payload: payload} - } - }() - - // Ignore the broadcast replies as they have been tested elsewhere - for i := 0; i < messageCount; i++ { - <-mbs.outgoing - } - - for { - select { - case in := <-disk: - block := new(cb.Block) - err := proto.Unmarshal(in, block) - if err != nil { - t.Fatal("Expected a block on the broker's disk") - } - if len(block.Data.Data) != messageCount { - t.Fatalf("Expected block to have %d messages instead of %d", messageCount, len(block.Data.Data)) - } - return - case <-time.After(testConf.General.BatchTimeout + testTimePadding): - t.Fatal("Should have received a block by now") - } - } -} - -func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) { - if testConf.General.BatchSize <= 1 { - t.Skip("Skipping test as it requires a batchsize > 1") - } - - var once sync.Once - - messageCount := int(testConf.General.BatchSize) - 1 - - disk := make(chan []byte) - - mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk) - defer testClose(t, mb) - - mbs := newMockBroadcastStream(t) - go func() { - if err := mb.Broadcast(mbs); err != nil { - t.Fatal("Broadcast error:", err) - } - }() - - for i := 0; i < 2; i++ { - // Pump less than batchSize messages into the system - go func() { - for i := 0; i < messageCount; i++ { - payload, _ := proto.Marshal(&cb.Payload{Data: []byte("message " + strconv.Itoa(i))}) - mbs.incoming <- &cb.Envelope{Payload: payload} - } - }() - - // Ignore the broadcast replies as they have been tested elsewhere - for i := 0; i < messageCount; i++ { - <-mbs.outgoing - } - - once.Do(func() { - <-disk // First incomplete block, tested elsewhere - }) - } - - for { - select { - case in := <-disk: - block := new(cb.Block) - err := proto.Unmarshal(in, block) - if err != nil { - t.Fatal("Expected a block on the broker's disk") - } - if len(block.Data.Data) != messageCount { - t.Fatalf("Expected block to have %d messages instead of %d", messageCount, len(block.Data.Data)) - } - return - case <-time.After(testConf.General.BatchTimeout + testTimePadding): - t.Fatal("Should have received a block by now") - } - } -} - -func TestBroadcastBatchAndQuitEarly(t *testing.T) { - disk := make(chan []byte) - - mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk) - defer testClose(t, mb) - - mbs := newMockBroadcastStream(t) - go func() { - if err := mb.Broadcast(mbs); err != nil { - t.Fatal("Broadcast error:", err) - } - }() - - // Pump a batch's worth of messages into the system - go func() { - for i := 0; i < int(testConf.General.BatchSize); i++ { - mbs.incoming <- &cb.Envelope{Payload: []byte("message " + strconv.Itoa(i))} - } - }() - - // In contrast to TestBroadcastBatch, do not receive any replies. - // This simulates the case where you quit early (though you would - // most likely still get replies in a real world scenario, as long - // as you don't receive all of them we're on the same page). - for !mbs.CloseOut() { - } - - for { - select { - case in := <-disk: - block := new(cb.Block) - err := proto.Unmarshal(in, block) - if err != nil { - t.Fatal("Expected a block on the broker's disk") - } - if len(block.Data.Data) != int(testConf.General.BatchSize) { - t.Fatalf("Expected block to have %d messages instead of %d", testConf.General.BatchSize, len(block.Data.Data)) - } - return - case <-time.After(500 * time.Millisecond): - t.Fatal("Should have received a block by now") - } - } -} - -func TestBroadcastClose(t *testing.T) { - errChan := make(chan error) - - mb := mockNewBroadcaster(t, testConf, testOldestOffset, make(chan []byte)) - mbs := newMockBroadcastStream(t) - go func() { - if err := mb.Broadcast(mbs); err != nil { - t.Fatal("Broadcast error:", err) - } - }() - - go func() { - errChan <- mb.Close() - }() - - for { - select { - case err := <-errChan: - if err != nil { - t.Fatal("Error when closing the broadcaster:", err) - } - return - case <-time.After(500 * time.Millisecond): - t.Fatal("Broadcaster should have closed its producer by now") - } - } - -} diff --git a/orderer/kafka/broker.go b/orderer/kafka/broker.go index d0030cb4ca9..dcd37524e35 100644 --- a/orderer/kafka/broker.go +++ b/orderer/kafka/broker.go @@ -20,90 +20,88 @@ import ( "fmt" "github.com/Shopify/sarama" - "github.com/hyperledger/fabric/orderer/localconfig" ) -// Broker allows the caller to get info on the orderer's stream +// Broker allows the caller to get info on the cluster's partitions type Broker interface { - GetOffset(req *sarama.OffsetRequest) (int64, error) + GetOffset(cp ChainPartition, req *sarama.OffsetRequest) (int64, error) Closeable } type brokerImpl struct { broker *sarama.Broker - config *config.TopLevel } -func newBroker(conf *config.TopLevel) Broker { +// Connects to the broker that handles all produce and consume +// requests for the given chain (Partition Leader Replica) +func newBroker(brokers []string, cp ChainPartition) (Broker, error) { + var candidateBroker, connectedBroker, leaderBroker *sarama.Broker - // connect to one of the bootstrap servers - var bootstrapServer *sarama.Broker - for _, hostPort := range conf.Kafka.Brokers { - broker := sarama.NewBroker(hostPort) - if err := broker.Open(nil); err != nil { - logger.Warningf("Failed to connect to bootstrap server at %s: %v.", hostPort, err) + // Connect to one of the given brokers + for _, hostPort := range brokers { + candidateBroker = sarama.NewBroker(hostPort) + if err := candidateBroker.Open(nil); err != nil { + logger.Warningf("Failed to connect to broker %s: %s", hostPort, err) continue } - if connected, err := broker.Connected(); !connected { - logger.Warningf("Failed to connect to bootstrap server at %s: %v.", hostPort, err) + if connected, err := candidateBroker.Connected(); !connected { + logger.Warningf("Failed to connect to broker %s: %s", hostPort, err) continue } - bootstrapServer = broker + connectedBroker = candidateBroker break } - if bootstrapServer == nil { - panic(fmt.Errorf("Failed to connect to any of the bootstrap servers (%v) for metadata request.", conf.Kafka.Brokers)) + + if connectedBroker == nil { + return nil, fmt.Errorf("Failed to connect to any of the given brokers (%v) for metadata request", brokers) } - logger.Debugf("Connected to bootstrap server at %s.", bootstrapServer.Addr()) + logger.Debugf("Connected to broker %s", connectedBroker.Addr()) - // get metadata for topic - topic := conf.Kafka.Topic - metadata, err := bootstrapServer.GetMetadata(&sarama.MetadataRequest{Topics: []string{topic}}) + // Get metadata for the topic that corresponds to this chain + metadata, err := connectedBroker.GetMetadata(&sarama.MetadataRequest{Topics: []string{cp.Topic()}}) if err != nil { - panic(fmt.Errorf("GetMetadata failed for topic %s: %v", topic, err)) + return nil, fmt.Errorf("Failed to get metadata for topic %s: %s", cp, err) } - // get leader broker for given topic/partition - var broker *sarama.Broker - partitionID := conf.Kafka.PartitionID - if (partitionID >= 0) && (partitionID < int32(len(metadata.Topics[0].Partitions))) { - leader := metadata.Topics[0].Partitions[partitionID].Leader - logger.Debugf("Leading broker for topic %s/partition %d is broker ID %d", topic, partitionID, leader) - for _, b := range metadata.Brokers { - if b.ID() == leader { - broker = b + // Get the leader broker for this chain partition + if (cp.Partition() >= 0) && (cp.Partition() < int32(len(metadata.Topics[0].Partitions))) { + leaderBrokerID := metadata.Topics[0].Partitions[cp.Partition()].Leader + logger.Debugf("Leading broker for chain %s is broker ID %d", cp, leaderBrokerID) + for _, availableBroker := range metadata.Brokers { + if availableBroker.ID() == leaderBrokerID { + leaderBroker = availableBroker break } } } - if broker == nil { - panic(fmt.Errorf("Can't find leader for topic %s/partition %d", topic, partitionID)) + + if leaderBroker == nil { + return nil, fmt.Errorf("Can't find leader for chain %s", cp) } - // connect to broker - if err := broker.Open(nil); err != nil { - panic(fmt.Errorf("Failed to open Kafka broker: %v", err)) + // Connect to broker + if err := leaderBroker.Open(nil); err != nil { + return nil, fmt.Errorf("Failed to connect ho Kafka broker: %s", err) } - if connected, err := broker.Connected(); !connected { - panic(fmt.Errorf("Failed to open Kafka broker: %v", err)) + if connected, err := leaderBroker.Connected(); !connected { + return nil, fmt.Errorf("Failed to connect to Kafka broker: %s", err) } - return &brokerImpl{ - broker: broker, - config: conf, - } + return &brokerImpl{broker: leaderBroker}, nil } -// GetOffset retrieves the offset number that corresponds to the requested position in the log -func (b *brokerImpl) GetOffset(req *sarama.OffsetRequest) (int64, error) { +// GetOffset retrieves the offset number that corresponds +// to the requested position in the log. +func (b *brokerImpl) GetOffset(cp ChainPartition, req *sarama.OffsetRequest) (int64, error) { resp, err := b.broker.GetAvailableOffsets(req) if err != nil { return int64(-1), err } - return resp.GetBlock(b.config.Kafka.Topic, b.config.Kafka.PartitionID).Offsets[0], nil + return resp.GetBlock(cp.Topic(), cp.Partition()).Offsets[0], nil } -// Close terminates the broker +// Close terminates the broker. +// This is invoked by the session deliverer's getOffset method. func (b *brokerImpl) Close() error { return b.broker.Close() } diff --git a/orderer/kafka/broker_mock_test.go b/orderer/kafka/broker_mock_test.go index bea5ac76640..65c5c59ac88 100644 --- a/orderer/kafka/broker_mock_test.go +++ b/orderer/kafka/broker_mock_test.go @@ -17,10 +17,10 @@ limitations under the License. package kafka import ( + "fmt" "testing" "github.com/Shopify/sarama" - "github.com/hyperledger/fabric/orderer/localconfig" ) type mockBrockerImpl struct { @@ -30,34 +30,33 @@ type mockBrockerImpl struct { handlerMap map[string]sarama.MockResponse } -func mockNewBroker(t *testing.T, conf *config.TopLevel) Broker { +func mockNewBroker(t *testing.T, cp ChainPartition) (Broker, error) { mockBroker := sarama.NewMockBroker(t, testBrokerID) handlerMap := make(map[string]sarama.MockResponse) // The sarama mock package doesn't allow us to return an error // for invalid offset requests, so we return an offset of -1. // Note that the mock offset responses below imply a broker with - // testNewestOffset-1 blocks available. Therefore, if you are using this + // newestOffset-1 blocks available. Therefore, if you are using this // broker as part of a bigger test where you intend to consume blocks, // make sure that the mockConsumer has been initialized accordingly - // (Set the 'seek' parameter to testNewestOffset-1.) + // (Set the 'offset' parameter to newestOffset-1.) handlerMap["OffsetRequest"] = sarama.NewMockOffsetResponse(t). - SetOffset(conf.Kafka.Topic, conf.Kafka.PartitionID, sarama.OffsetOldest, testOldestOffset). - SetOffset(conf.Kafka.Topic, conf.Kafka.PartitionID, sarama.OffsetNewest, testNewestOffset) + SetOffset(cp.Topic(), cp.Partition(), sarama.OffsetOldest, testOldestOffset). + SetOffset(cp.Topic(), cp.Partition(), sarama.OffsetNewest, testNewestOffset) mockBroker.SetHandlerByMap(handlerMap) broker := sarama.NewBroker(mockBroker.Addr()) if err := broker.Open(nil); err != nil { - t.Fatal("Cannot connect to mock broker:", err) + return nil, fmt.Errorf("Cannot connect to mock broker: %s", err) } return &mockBrockerImpl{ brokerImpl: brokerImpl{ broker: broker, - config: conf, }, mockBroker: mockBroker, handlerMap: handlerMap, - } + }, nil } func (mb *mockBrockerImpl) Close() error { diff --git a/orderer/kafka/broker_test.go b/orderer/kafka/broker_test.go index 8717ee8b65a..b40316181ec 100644 --- a/orderer/kafka/broker_test.go +++ b/orderer/kafka/broker_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/Shopify/sarama" + "github.com/hyperledger/fabric/orderer/common/bootstrap/provisional" ) func TestBrokerGetOffset(t *testing.T) { @@ -28,71 +29,55 @@ func TestBrokerGetOffset(t *testing.T) { } func testBrokerGetOffsetFunc(given, expected int64) func(t *testing.T) { + cp := newChainPartition(provisional.TestChainID, rawPartition) return func(t *testing.T) { - mb := mockNewBroker(t, testConf) + mb, _ := mockNewBroker(t, cp) defer testClose(t, mb) - offset, _ := mb.GetOffset(newOffsetReq(mb.(*mockBrockerImpl).config, given)) - if offset != expected { - t.Fatalf("Expected offset %d, got %d instead", expected, offset) + ofs, _ := mb.GetOffset(cp, newOffsetReq(cp, given)) + if ofs != expected { + t.Fatalf("Expected offset %d, got %d instead", expected, ofs) } } } func TestNewBrokerReturnsPartitionLeader(t *testing.T) { - - // sarama.Logger = log.New(os.Stdout, "[sarama] ", log.Lshortfile) - // SetLogLevel("debug") - - broker1 := sarama.NewMockBroker(t, 1001) - broker2 := sarama.NewMockBroker(t, 1002) - broker3 := sarama.NewMockBroker(t, 1003) - - // shutdown broker1 - broker1.Close() - - // update list of bootstrap brokers in config - originalKafkaBrokers := testConf.Kafka.Brokers + cp := newChainPartition(provisional.TestChainID, rawPartition) + broker1 := sarama.NewMockBroker(t, 1) + broker2 := sarama.NewMockBroker(t, 2) + broker3 := sarama.NewMockBroker(t, 3) defer func() { - testConf.Kafka.Brokers = originalKafkaBrokers + broker2.Close() + broker3.Close() }() - // add broker1, and broker2 to list of bootstrap brokers - // broker1 is 'down' - // broker3 will be discovered via a metadata request - testConf.Kafka.Brokers = []string{broker1.Addr(), broker2.Addr()} - // handy references - topic := testConf.Kafka.Topic - partition := testConf.Kafka.PartitionID + // Use broker1 and broker2 as bootstrap brokers, but shutdown broker1 right away + broker1.Close() - // add expectation that broker2 will return a metadata response that - // identifies broker3 as the topic partition leader + // Add expectation that broker2 will return a metadata response + // that identifies broker3 as the topic partition leader broker2.SetHandlerByMap(map[string]sarama.MockResponse{ "MetadataRequest": sarama.NewMockMetadataResponse(t). SetBroker(broker1.Addr(), broker1.BrokerID()). SetBroker(broker2.Addr(), broker2.BrokerID()). SetBroker(broker3.Addr(), broker3.BrokerID()). - SetLeader(topic, partition, broker3.BrokerID()), + SetLeader(cp.Topic(), cp.Partition(), broker3.BrokerID()), }) - // add expectation that broker3 respond to an offset request + // Add expectation that broker3 responds to an offset request broker3.SetHandlerByMap(map[string]sarama.MockResponse{ "OffsetRequest": sarama.NewMockOffsetResponse(t). - SetOffset(topic, partition, sarama.OffsetOldest, 0). - SetOffset(topic, partition, sarama.OffsetNewest, 42), + SetOffset(cp.Topic(), cp.Partition(), sarama.OffsetOldest, testOldestOffset). + SetOffset(cp.Topic(), cp.Partition(), sarama.OffsetNewest, testNewestOffset), }) - // get leader for topic partition - broker := newBroker(testConf) + // Get leader for the test chain partition + leaderBroker, _ := newBroker([]string{broker1.Addr(), broker2.Addr()}, cp) - // only broker3 will respond successfully to an offset request + // Only broker3 will respond successfully to an offset request offsetRequest := new(sarama.OffsetRequest) - offsetRequest.AddBlock(topic, partition, -1, 1) - if _, err := broker.GetOffset(offsetRequest); err != nil { - t.Fatal(err) + offsetRequest.AddBlock(cp.Topic(), cp.Partition(), -1, 1) + if _, err := leaderBroker.GetOffset(cp, offsetRequest); err != nil { + t.Fatal("Expected leader broker to respond to request:", err) } - - broker2.Close() - broker3.Close() - } diff --git a/orderer/kafka/client_deliver.go b/orderer/kafka/client_deliver.go deleted file mode 100644 index 3a889c83912..00000000000 --- a/orderer/kafka/client_deliver.go +++ /dev/null @@ -1,236 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "errors" - "fmt" - - "github.com/golang/protobuf/proto" - "github.com/hyperledger/fabric/orderer/localconfig" - cb "github.com/hyperledger/fabric/protos/common" - ab "github.com/hyperledger/fabric/protos/orderer" -) - -type clientDelivererImpl struct { - brokerFunc func(*config.TopLevel) Broker - consumerFunc func(*config.TopLevel, int64) (Consumer, error) // This resets the consumer. - - consumer Consumer - config *config.TopLevel - deadChan chan struct{} - - errChan chan error - updChan chan *ab.DeliverUpdate - tokenChan chan struct{} - lastACK int64 - window int64 -} - -func newClientDeliverer(conf *config.TopLevel, deadChan chan struct{}) Deliverer { - brokerFunc := func(conf *config.TopLevel) Broker { - return newBroker(conf) - } - consumerFunc := func(conf *config.TopLevel, seek int64) (Consumer, error) { - return newConsumer(conf, seek) - } - - return &clientDelivererImpl{ - brokerFunc: brokerFunc, - consumerFunc: consumerFunc, - - config: conf, - deadChan: deadChan, - errChan: make(chan error), - updChan: make(chan *ab.DeliverUpdate), // TODO Size this properly - } -} - -// Deliver receives updates from a client and returns a stream of blocks to them -func (cd *clientDelivererImpl) Deliver(stream ab.AtomicBroadcast_DeliverServer) error { - go cd.recvUpdates(stream) - return cd.sendBlocks(stream) -} - -// Close shuts down the Deliver server assigned by the orderer to a client -func (cd *clientDelivererImpl) Close() error { - if cd.consumer != nil { - return cd.consumer.Close() - } - return nil -} - -func (cd *clientDelivererImpl) recvUpdates(stream ab.AtomicBroadcast_DeliverServer) { - for { - upd, err := stream.Recv() - if err != nil { - cd.errChan <- err - return - } - cd.updChan <- upd - } -} - -func (cd *clientDelivererImpl) sendBlocks(stream ab.AtomicBroadcast_DeliverServer) error { - var err error - var reply *ab.DeliverResponse - var upd *ab.DeliverUpdate - block := new(cb.Block) - for { - select { - case <-cd.deadChan: - logger.Debug("sendBlocks goroutine for client-deliverer received shutdown signal") - return nil - case err = <-cd.errChan: - return err - case upd = <-cd.updChan: - switch t := upd.GetType().(type) { - case *ab.DeliverUpdate_Seek: - err = cd.processSeek(t) - case *ab.DeliverUpdate_Acknowledgement: - err = cd.processACK(t) - } - if err != nil { - var errorStatus cb.Status - // TODO Will need to flesh this out into - // a proper error handling system eventually. - switch err.Error() { - case seekOutOfRangeError: - errorStatus = cb.Status_NOT_FOUND - case ackOutOfRangeError, windowOutOfRangeError: - errorStatus = cb.Status_BAD_REQUEST - default: - errorStatus = cb.Status_SERVICE_UNAVAILABLE - } - reply = new(ab.DeliverResponse) - reply.Type = &ab.DeliverResponse_Error{Error: errorStatus} - if err := stream.Send(reply); err != nil { - return fmt.Errorf("Failed to send error response to the client: %s", err) - } - return fmt.Errorf("Failed to process received update: %s", err) - } - case <-cd.tokenChan: - select { - case data := <-cd.consumer.Recv(): - err := proto.Unmarshal(data.Value, block) - if err != nil { - logger.Info("Failed to unmarshal retrieved block from ordering service:", err) - } - reply = new(ab.DeliverResponse) - reply.Type = &ab.DeliverResponse_Block{Block: block} - err = stream.Send(reply) - if err != nil { - return fmt.Errorf("Failed to send block to the client: %s", err) - } - logger.Debugf("Sent block %v to client (prevHash: %v, messages: %v)\n", - block.Header.Number, block.Header.PreviousHash, block.Data.Data) - default: - // Return the push token if there are no messages - // available from the ordering service. - cd.tokenChan <- struct{}{} - } - } - } -} - -func (cd *clientDelivererImpl) processSeek(msg *ab.DeliverUpdate_Seek) error { - var err error - var seek, window int64 - logger.Debug("Received SEEK message") - - window = int64(msg.Seek.WindowSize) - if window <= 0 || window > int64(cd.config.General.MaxWindowSize) { - return errors.New(windowOutOfRangeError) - } - cd.window = window - logger.Debug("Requested window size set to", cd.window) - - oldestAvailable, err := cd.getOffset(int64(-2)) - if err != nil { - return err - } - newestAvailable, err := cd.getOffset(int64(-1)) - if err != nil { - return err - } - newestAvailable-- // Cause in the case of newest, the library actually gives us the seqNo of the *next* new block - - switch msg.Seek.Start { - case ab.SeekInfo_OLDEST: - seek = oldestAvailable - case ab.SeekInfo_NEWEST: - seek = newestAvailable - case ab.SeekInfo_SPECIFIED: - seek = int64(msg.Seek.SpecifiedNumber) - if !(seek >= oldestAvailable && seek <= newestAvailable) { - return errors.New(seekOutOfRangeError) - } - } - - logger.Debug("Requested seek number set to", seek) - - cd.disablePush() - if err := cd.Close(); err != nil { - return err - } - cd.lastACK = seek - 1 - logger.Debug("Set last ACK for this client's consumer to", cd.lastACK) - - cd.consumer, err = cd.consumerFunc(cd.config, seek) - if err != nil { - return err - } - - cd.enablePush(cd.window) - return nil -} - -func (cd *clientDelivererImpl) getOffset(seek int64) (int64, error) { - broker := cd.brokerFunc(cd.config) - defer broker.Close() - return broker.GetOffset(newOffsetReq(cd.config, seek)) -} - -func (cd *clientDelivererImpl) disablePush() int64 { - // No need to add a lock to ensure these operations happen atomically. - // The caller is the only function that can modify the tokenChan. - remTokens := int64(len(cd.tokenChan)) - cd.tokenChan = nil - logger.Debugf("Pushing blocks to client paused; found %v unused push token(s)", remTokens) - return remTokens -} - -func (cd *clientDelivererImpl) enablePush(newTokenCount int64) { - cd.tokenChan = make(chan struct{}, newTokenCount) - for i := int64(0); i < newTokenCount; i++ { - cd.tokenChan <- struct{}{} - } - logger.Debugf("Pushing blocks to client resumed; %v push token(s) available", newTokenCount) -} - -func (cd *clientDelivererImpl) processACK(msg *ab.DeliverUpdate_Acknowledgement) error { - logger.Debug("Received ACK for block", msg.Acknowledgement.Number) - remTokens := cd.disablePush() - newACK := int64(msg.Acknowledgement.Number) // TODO Optionally mark this offset in Kafka - if (newACK < cd.lastACK) || (newACK > cd.lastACK+cd.window) { - return errors.New(ackOutOfRangeError) - } - newTokenCount := newACK - cd.lastACK + remTokens - cd.lastACK = newACK - cd.enablePush(newTokenCount) - return nil -} diff --git a/orderer/kafka/client_deliver_mock_test.go b/orderer/kafka/client_deliver_mock_test.go deleted file mode 100644 index fc49f777a9f..00000000000 --- a/orderer/kafka/client_deliver_mock_test.go +++ /dev/null @@ -1,51 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "testing" - - "github.com/hyperledger/fabric/orderer/localconfig" - ab "github.com/hyperledger/fabric/protos/orderer" -) - -type mockClientDelivererImpl struct { - clientDelivererImpl - t *testing.T -} - -func mockNewClientDeliverer(t *testing.T, conf *config.TopLevel, deadChan chan struct{}) Deliverer { - mockBrokerFunc := func(conf *config.TopLevel) Broker { - return mockNewBroker(t, conf) - } - mockConsumerFunc := func(conf *config.TopLevel, seek int64) (Consumer, error) { - return mockNewConsumer(t, conf, seek) - } - - return &mockClientDelivererImpl{ - clientDelivererImpl: clientDelivererImpl{ - brokerFunc: mockBrokerFunc, - consumerFunc: mockConsumerFunc, - - config: conf, - deadChan: deadChan, - errChan: make(chan error), - updChan: make(chan *ab.DeliverUpdate), - }, - t: t, - } -} diff --git a/orderer/kafka/client_deliver_test.go b/orderer/kafka/client_deliver_test.go deleted file mode 100644 index 31d5b017985..00000000000 --- a/orderer/kafka/client_deliver_test.go +++ /dev/null @@ -1,183 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "testing" - "time" - - ab "github.com/hyperledger/fabric/protos/orderer" -) - -func TestClientDeliverSeekWrong(t *testing.T) { - t.Run("out-of-range-1", testClientDeliverSeekWrongFunc(uint64(testOldestOffset)-1, 10)) - t.Run("out-of-range-2", testClientDeliverSeekWrongFunc(uint64(testNewestOffset), 10)) - t.Run("bad-window-1", testClientDeliverSeekWrongFunc(uint64(testOldestOffset), 0)) - t.Run("bad-window-2", testClientDeliverSeekWrongFunc(uint64(testOldestOffset), uint64(testConf.General.MaxWindowSize+1))) -} - -func testClientDeliverSeekWrongFunc(seek, window uint64) func(t *testing.T) { - return func(t *testing.T) { - mds := newMockDeliverStream(t) - - dc := make(chan struct{}) - defer close(dc) // Kill the getBlocks goroutine - - mcd := mockNewClientDeliverer(t, testConf, dc) - defer testClose(t, mcd) - go func() { - if err := mcd.Deliver(mds); err == nil { - t.Fatal("Should have received an error response") - } - }() - - mds.incoming <- testNewSeekMessage("specific", seek, window) - - for { - select { - case msg := <-mds.outgoing: - switch msg.GetType().(type) { - case *ab.DeliverResponse_Error: - return // This is the success path for this test - default: - t.Fatal("Should have received an error response") - } - case <-time.After(500 * time.Millisecond): - t.Fatal("Should have received an error response") - } - } - } -} - -func TestClientDeliverSeek(t *testing.T) { - t.Run("oldest", testClientDeliverSeekFunc("oldest", 0, 10, 10)) - t.Run("in-between", testClientDeliverSeekFunc("specific", uint64(testMiddleOffset), 10, 10)) - t.Run("newest", testClientDeliverSeekFunc("newest", 0, 10, 1)) -} - -func testClientDeliverSeekFunc(label string, seek, window uint64, expected int) func(*testing.T) { - return func(t *testing.T) { - mds := newMockDeliverStream(t) - - dc := make(chan struct{}) - defer close(dc) // Kill the getBlocks goroutine - - mcd := mockNewClientDeliverer(t, testConf, dc) - defer testClose(t, mcd) - go func() { - if err := mcd.Deliver(mds); err != nil { - t.Fatal("Deliver error:", err) - } - }() - - count := 0 - mds.incoming <- testNewSeekMessage(label, seek, window) - for { - select { - case <-mds.outgoing: - count++ - if count > expected { - t.Fatalf("Delivered %d blocks to the client w/o ACK, expected %d", count, expected) - } - case <-time.After(500 * time.Millisecond): - if count != expected { - t.Fatalf("Delivered %d blocks to the client w/o ACK, expected %d", count, expected) - } - return - } - } - } -} - -func TestClientDeliverAckWrong(t *testing.T) { - t.Run("out-of-range-ack-1", testClientDeliverAckWrongFunc(uint64(testMiddleOffset)-2)) - t.Run("out-of-range-ack-2", testClientDeliverAckWrongFunc(uint64(testNewestOffset))) -} - -func testClientDeliverAckWrongFunc(ack uint64) func(t *testing.T) { - return func(t *testing.T) { - mds := newMockDeliverStream(t) - - dc := make(chan struct{}) - defer close(dc) // Kill the getBlocks goroutine - - mcd := mockNewClientDeliverer(t, testConf, dc) - defer testClose(t, mcd) - go func() { - if err := mcd.Deliver(mds); err == nil { - t.Fatal("Should have received an error response") - } - }() - - mds.incoming <- testNewSeekMessage("specific", uint64(testMiddleOffset), 10) - mds.incoming <- testNewAckMessage(ack) - for { - select { - case msg := <-mds.outgoing: - switch msg.GetType().(type) { - case *ab.DeliverResponse_Error: - return // This is the success path for this test - default: - } - case <-time.After(500 * time.Millisecond): - t.Fatal("Should have returned earlier due to wrong ACK") - } - } - } -} - -func TestClientDeliverAck(t *testing.T) { - t.Run("in-between", testClientDeliverAckFunc("specific", uint64(testMiddleOffset), 10, 10, 2*10)) - t.Run("newest", testClientDeliverAckFunc("newest", 0, 10, 1, 1)) -} - -func testClientDeliverAckFunc(label string, seek, window uint64, threshold, expected int) func(t *testing.T) { - return func(t *testing.T) { - mds := newMockDeliverStream(t) - - dc := make(chan struct{}) - defer close(dc) // Kill the getBlocks goroutine - - mcd := mockNewClientDeliverer(t, testConf, dc) - defer testClose(t, mcd) - go func() { - if err := mcd.Deliver(mds); err != nil { - t.Fatal("Deliver error:", err) - } - }() - - mds.incoming <- testNewSeekMessage(label, seek, window) - count := 0 - for { - select { - case msg := <-mds.outgoing: - count++ - if count == threshold { - mds.incoming <- testNewAckMessage(msg.GetBlock().Header.Number) - } - if count > expected { - t.Fatalf("Delivered %d blocks to the client w/o ACK, expected %d", count, expected) - } - case <-time.After(500 * time.Millisecond): - if count != expected { - t.Fatalf("Delivered %d blocks to the client w/o ACK, expected %d", count, expected) - } - return - } - } - } -} diff --git a/orderer/kafka/config_test.go b/orderer/kafka/config_test.go index 3bd460583b8..7b366d3c052 100644 --- a/orderer/kafka/config_test.go +++ b/orderer/kafka/config_test.go @@ -23,7 +23,6 @@ import ( "github.com/Shopify/sarama" "github.com/hyperledger/fabric/orderer/localconfig" cb "github.com/hyperledger/fabric/protos/common" - ab "github.com/hyperledger/fabric/protos/orderer" ) var ( @@ -70,35 +69,3 @@ func testClose(t *testing.T, x Closeable) { func newTestEnvelope(content string) *cb.Envelope { return &cb.Envelope{Payload: []byte(content)} } - -func testNewSeekMessage(startLabel string, seekNo, windowNo uint64) *ab.DeliverUpdate { - var startVal ab.SeekInfo_StartType - switch startLabel { - case "oldest": - startVal = ab.SeekInfo_OLDEST - case "newest": - startVal = ab.SeekInfo_NEWEST - default: - startVal = ab.SeekInfo_SPECIFIED - - } - return &ab.DeliverUpdate{ - Type: &ab.DeliverUpdate_Seek{ - Seek: &ab.SeekInfo{ - Start: startVal, - SpecifiedNumber: seekNo, - WindowSize: windowNo, - }, - }, - } -} - -func testNewAckMessage(ackNo uint64) *ab.DeliverUpdate { - return &ab.DeliverUpdate{ - Type: &ab.DeliverUpdate_Acknowledgement{ - Acknowledgement: &ab.Acknowledgement{ - Number: ackNo, - }, - }, - } -} diff --git a/orderer/kafka/consumer.go b/orderer/kafka/consumer.go index f1f3681c3a7..29714e89019 100644 --- a/orderer/kafka/consumer.go +++ b/orderer/kafka/consumer.go @@ -16,12 +16,9 @@ limitations under the License. package kafka -import ( - "github.com/Shopify/sarama" - "github.com/hyperledger/fabric/orderer/localconfig" -) +import "github.com/Shopify/sarama" -// Consumer allows the caller to receive a stream of messages from the orderer +// Consumer allows the caller to receive a stream of blobs from the Kafka cluster for a specific partition. type Consumer interface { Recv() <-chan *sarama.ConsumerMessage Closeable @@ -32,26 +29,31 @@ type consumerImpl struct { partition sarama.PartitionConsumer } -func newConsumer(conf *config.TopLevel, seek int64) (Consumer, error) { - parent, err := sarama.NewConsumer(conf.Kafka.Brokers, newBrokerConfig(conf)) +func newConsumer(brokers []string, kafkaVersion sarama.KafkaVersion, cp ChainPartition, offset int64) (Consumer, error) { + parent, err := sarama.NewConsumer(brokers, newBrokerConfig(kafkaVersion, rawPartition)) if err != nil { return nil, err } - partition, err := parent.ConsumePartition(conf.Kafka.Topic, conf.Kafka.PartitionID, seek) + partition, err := parent.ConsumePartition(cp.Topic(), cp.Partition(), offset) if err != nil { return nil, err } - c := &consumerImpl{parent: parent, partition: partition} - logger.Debug("Created new consumer for client beginning from block", seek) + c := &consumerImpl{ + parent: parent, + partition: partition, + } + logger.Debugf("Created new consumer for session (partition %s, beginning offset %d)", cp, offset) return c, nil } -// Recv returns a channel with messages received from the orderer +// Recv returns a channel with blobs received from the Kafka cluster for a partition. func (c *consumerImpl) Recv() <-chan *sarama.ConsumerMessage { return c.partition.Messages() } -// Close shuts down the partition consumer +// Close shuts down the partition consumer. +// Invoked by the session deliverer's Close method, which is itself called +// during the processSeek function, between disabling and enabling the push. func (c *consumerImpl) Close() error { if err := c.partition.Close(); err != nil { return err diff --git a/orderer/kafka/consumer_mock_test.go b/orderer/kafka/consumer_mock_test.go index 1e1207f2ecc..69855aa9c29 100644 --- a/orderer/kafka/consumer_mock_test.go +++ b/orderer/kafka/consumer_mock_test.go @@ -18,57 +18,70 @@ package kafka import ( "fmt" - "strconv" "testing" + "time" - "github.com/hyperledger/fabric/orderer/localconfig" - cb "github.com/hyperledger/fabric/protos/common" + ab "github.com/hyperledger/fabric/protos/orderer" + "github.com/hyperledger/fabric/protos/utils" "github.com/Shopify/sarama" "github.com/Shopify/sarama/mocks" - "github.com/golang/protobuf/proto" ) type mockConsumerImpl struct { consumedOffset int64 - parent *mocks.Consumer - partMgr *mocks.PartitionConsumer - partition sarama.PartitionConsumer - topic string - t *testing.T + chainPartition ChainPartition + + parentConsumer *mocks.Consumer + chainPartitionManager *mocks.PartitionConsumer + chainPartitionConsumer sarama.PartitionConsumer + disk chan *ab.KafkaMessage + isSetup chan struct{} + t *testing.T } -func mockNewConsumer(t *testing.T, conf *config.TopLevel, seek int64) (Consumer, error) { +func mockNewConsumer(t *testing.T, cp ChainPartition, offset int64, disk chan *ab.KafkaMessage) (Consumer, error) { var err error - parent := mocks.NewConsumer(t, nil) + parentConsumer := mocks.NewConsumer(t, nil) // NOTE The seek flag seems to be useless here. // The mock partition will have its highWatermarkOffset // initialized to 0 no matter what. I've opened up an issue // in the sarama repo: https://github.com/Shopify/sarama/issues/745 // Until this is resolved, use the testFillWithBlocks() hack below. - partMgr := parent.ExpectConsumePartition(conf.Kafka.Topic, conf.Kafka.PartitionID, seek) - partition, err := parent.ConsumePartition(conf.Kafka.Topic, conf.Kafka.PartitionID, seek) + cpManager := parentConsumer.ExpectConsumePartition(cp.Topic(), cp.Partition(), offset) + cpConsumer, err := parentConsumer.ConsumePartition(cp.Topic(), cp.Partition(), offset) // mockNewConsumer is basically a helper function when testing. // Any errors it generates internally, should result in panic // and not get propagated further; checking its errors in the // calling functions (i.e. the actual tests) increases boilerplate. if err != nil { - t.Fatal("Cannot create partition consumer:", err) + t.Fatal("Cannot create mock partition consumer:", err) } mc := &mockConsumerImpl{ consumedOffset: 0, - parent: parent, - partMgr: partMgr, - partition: partition, - topic: conf.Kafka.Topic, - t: t, + chainPartition: cp, + + parentConsumer: parentConsumer, + chainPartitionManager: cpManager, + chainPartitionConsumer: cpConsumer, + disk: disk, + isSetup: make(chan struct{}), + t: t, + } + // Stop-gap hack until sarama issue #745 is resolved: + if offset >= testOldestOffset && offset <= (testNewestOffset-1) { + mc.testFillWithBlocks(offset - 1) // Prepare the consumer so that the next Recv gives you blob #offset + } else { + err = fmt.Errorf("Out of range offset (seek number) given to consumer: %d", offset) + return mc, err } - // Stop-gap hack until #745 is resolved: - if seek >= testOldestOffset && seek <= (testNewestOffset-1) { - mc.testFillWithBlocks(seek - 1) // Prepare the consumer so that the next Recv gives you block "seek" + + if mc.consumedOffset == offset-1 { + close(mc.isSetup) } else { - err = fmt.Errorf("Out of range seek number given to consumer") + mc.t.Fatal("Mock consumer failed to initialize itself properly") } + return mc, err } @@ -76,42 +89,46 @@ func (mc *mockConsumerImpl) Recv() <-chan *sarama.ConsumerMessage { if mc.consumedOffset >= testNewestOffset-1 { return nil } - mc.consumedOffset++ - mc.partMgr.YieldMessage(testNewConsumerMessage(mc.consumedOffset, mc.topic)) - return mc.partition.Messages() + + // This is useful in cases where we want to <-Recv() in a for/select loop in + // a non-blocking manner. Without the timeout, the Go runtime will always + // execute the body of the Recv() method. If there in no outgoing message + // available, it will block while waiting on mc.disk. All the other cases in + // the original for/select loop then won't be evaluated until we unblock on + // <-mc.disk (which may never happen). + select { + case <-time.After(testTimePadding / 2): + case outgoingMsg := <-mc.disk: + mc.consumedOffset++ + mc.chainPartitionManager.YieldMessage(testNewConsumerMessage(mc.chainPartition, mc.consumedOffset, outgoingMsg)) + return mc.chainPartitionConsumer.Messages() + } + + return nil } func (mc *mockConsumerImpl) Close() error { - if err := mc.partition.Close(); err != nil { + if err := mc.chainPartitionManager.Close(); err != nil { return err } - return mc.parent.Close() + return mc.parentConsumer.Close() } -func (mc *mockConsumerImpl) testFillWithBlocks(seek int64) { - for i := int64(1); i <= seek; i++ { +func (mc *mockConsumerImpl) testFillWithBlocks(offset int64) { + for i := int64(1); i <= offset; i++ { + go func() { + mc.disk <- newRegularMessage(utils.MarshalOrPanic(newTestEnvelope(fmt.Sprintf("consumer fill-in %d", i)))) + }() <-mc.Recv() } + return } -func testNewConsumerMessage(offset int64, topic string) *sarama.ConsumerMessage { - blockData := &cb.BlockData{ - Data: [][]byte{[]byte(strconv.FormatInt(offset, 10))}, - } - block := &cb.Block{ - Header: &cb.BlockHeader{ - Number: uint64(offset), - }, - Data: blockData, - } - - data, err := proto.Marshal(block) - if err != nil { - panic("Error marshaling block") - } - +func testNewConsumerMessage(cp ChainPartition, offset int64, kafkaMessage *ab.KafkaMessage) *sarama.ConsumerMessage { return &sarama.ConsumerMessage{ - Value: sarama.ByteEncoder(data), - Topic: topic, + Value: sarama.ByteEncoder(utils.MarshalOrPanic(kafkaMessage)), + Topic: cp.Topic(), + Partition: cp.Partition(), + Offset: offset, } } diff --git a/orderer/kafka/consumer_test.go b/orderer/kafka/consumer_test.go index 737a5c3bb4d..9cab8731f9c 100644 --- a/orderer/kafka/consumer_test.go +++ b/orderer/kafka/consumer_test.go @@ -16,13 +16,18 @@ limitations under the License. package kafka -import "testing" +import ( + "testing" + + "github.com/hyperledger/fabric/orderer/common/bootstrap/provisional" + ab "github.com/hyperledger/fabric/protos/orderer" +) func TestConsumerInitWrong(t *testing.T) { cases := []int64{testOldestOffset - 1, testNewestOffset} - for _, seek := range cases { - mc, err := mockNewConsumer(t, testConf, seek) + for _, offset := range cases { + mc, err := mockNewConsumer(t, newChainPartition(provisional.TestChainID, rawPartition), offset, make(chan *ab.KafkaMessage)) testClose(t, mc) if err == nil { t.Fatal("Consumer should have failed with out-of-range error") @@ -37,18 +42,23 @@ func TestConsumerRecv(t *testing.T) { } func testConsumerRecvFunc(given, expected int64) func(t *testing.T) { + disk := make(chan *ab.KafkaMessage) return func(t *testing.T) { - mc, err := mockNewConsumer(t, testConf, given) + cp := newChainPartition(provisional.TestChainID, rawPartition) + mc, err := mockNewConsumer(t, cp, given, disk) if err != nil { testClose(t, mc) - t.Fatalf("Consumer should have proceeded normally: %s", err) + t.Fatal("Consumer should have proceeded normally:", err) } + go func() { + disk <- newRegularMessage([]byte("foo")) + }() msg := <-mc.Recv() - if (msg.Topic != testConf.Kafka.Topic) || - msg.Partition != testConf.Kafka.PartitionID || + if (msg.Topic != cp.Topic()) || + msg.Partition != cp.Partition() || msg.Offset != mc.(*mockConsumerImpl).consumedOffset || msg.Offset != expected { - t.Fatalf("Expected block %d, got %d", expected, msg.Offset) + t.Fatalf("Expected message with offset %d, got %d", expected, msg.Offset) } testClose(t, mc) } diff --git a/orderer/kafka/deliver.go b/orderer/kafka/deliver.go deleted file mode 100644 index 1a8a07aefed..00000000000 --- a/orderer/kafka/deliver.go +++ /dev/null @@ -1,64 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "sync" - - "github.com/hyperledger/fabric/orderer/localconfig" - ab "github.com/hyperledger/fabric/protos/orderer" -) - -// Deliverer allows the caller to receive blocks from the orderer -type Deliverer interface { - Deliver(stream ab.AtomicBroadcast_DeliverServer) error - Closeable -} - -type delivererImpl struct { - config *config.TopLevel - deadChan chan struct{} - wg sync.WaitGroup -} - -func newDeliverer(conf *config.TopLevel) Deliverer { - return &delivererImpl{ - config: conf, - deadChan: make(chan struct{}), - } -} - -// Deliver receives updates from connected clients and adjusts -// the transmission of ordered messages to them accordingly -func (d *delivererImpl) Deliver(stream ab.AtomicBroadcast_DeliverServer) error { - cd := newClientDeliverer(d.config, d.deadChan) - - d.wg.Add(1) - defer d.wg.Done() - - defer cd.Close() - return cd.Deliver(stream) -} - -// Close shuts down the delivery side of the orderer -func (d *delivererImpl) Close() error { - close(d.deadChan) - // Wait till all the client-deliverer consumers have closed - // Note that their recvReplies goroutines keep on going - d.wg.Wait() - return nil -} diff --git a/orderer/kafka/deliver_mock_test.go b/orderer/kafka/deliver_mock_test.go deleted file mode 100644 index d1751be17a6..00000000000 --- a/orderer/kafka/deliver_mock_test.go +++ /dev/null @@ -1,50 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "testing" - - "github.com/hyperledger/fabric/orderer/localconfig" - ab "github.com/hyperledger/fabric/protos/orderer" -) - -type mockDelivererImpl struct { - delivererImpl - t *testing.T -} - -func mockNewDeliverer(t *testing.T, conf *config.TopLevel) Deliverer { - md := &mockDelivererImpl{ - delivererImpl: delivererImpl{ - config: conf, - deadChan: make(chan struct{}), - }, - t: t, - } - return md -} - -func (md *mockDelivererImpl) Deliver(stream ab.AtomicBroadcast_DeliverServer) error { - mcd := mockNewClientDeliverer(md.t, md.config, md.deadChan) - - md.wg.Add(1) - defer md.wg.Done() - - defer mcd.Close() - return mcd.Deliver(stream) -} diff --git a/orderer/kafka/deliver_test.go b/orderer/kafka/deliver_test.go deleted file mode 100644 index 34b84d437f9..00000000000 --- a/orderer/kafka/deliver_test.go +++ /dev/null @@ -1,94 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "testing" - "time" -) - -func TestDeliverMultipleClients(t *testing.T) { - connectedClients := 3 - seekMsgs := []struct { - start string - seek, window uint64 - }{ - {"oldest", 0, 10}, {"newest", 0, 10}, {"specific", uint64(testMiddleOffset), 10}, - } - expected := 21 // 10 + 1 + 10 - - md := mockNewDeliverer(t, testConf) - defer testClose(t, md) - - var mds []*mockDeliverStream - for i := 0; i < connectedClients; i++ { - mds = append(mds, newMockDeliverStream(t)) - go func() { - if err := md.Deliver(mds[i]); err != nil { - t.Fatal("Deliver error:", err) - } - }() - mds[i].incoming <- testNewSeekMessage(seekMsgs[i].start, seekMsgs[i].seek, seekMsgs[i].window) - } - - count := 0 - - for i := 0; i < connectedClients; i++ { - client: - for { - select { - case <-mds[i].outgoing: - count++ - case <-time.After(500 * time.Millisecond): - break client - } - } - } - - if count != expected { - t.Fatalf("Expected %d blocks total delivered to all clients, got %d", expected, count) - } -} - -func TestDeliverClose(t *testing.T) { - errChan := make(chan error) - - md := mockNewDeliverer(t, testConf) - mds := newMockDeliverStream(t) - go func() { - if err := md.Deliver(mds); err != nil { - t.Fatal("Deliver error:", err) - } - }() - - go func() { - errChan <- md.Close() - }() - - for { - select { - case err := <-errChan: - if err != nil { - t.Fatal("Error when closing the deliverer:", err) - } - return - case <-time.After(500 * time.Millisecond): - t.Fatal("Deliverer should have closed all client deliverers by now") - } - } - -} diff --git a/orderer/kafka/main.go b/orderer/kafka/main.go new file mode 100644 index 00000000000..0b4c2e05663 --- /dev/null +++ b/orderer/kafka/main.go @@ -0,0 +1,246 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kafka + +import ( + "github.com/Shopify/sarama" + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/orderer/localconfig" + "github.com/hyperledger/fabric/orderer/multichain" + cb "github.com/hyperledger/fabric/protos/common" + ab "github.com/hyperledger/fabric/protos/orderer" + "github.com/hyperledger/fabric/protos/utils" +) + +// New creates a Kafka-backed consenter. Called by orderer's main.go. +func New(kv sarama.KafkaVersion, ro config.Retry) multichain.Consenter { + return newConsenter(kv, ro, bfValue, pfValue, cfValue) +} + +// New calls here because we need to pass additional arguments to +// the constructor and New() should only read from the config file. +func newConsenter(kv sarama.KafkaVersion, ro config.Retry, bf bfType, pf pfType, cf cfType) multichain.Consenter { + return &consenterImpl{kv, ro, bf, pf, cf} +} + +// bfType defines the signature of the broker constructor. +type bfType func([]string, ChainPartition) (Broker, error) + +// pfType defines the signature of the producer constructor. +type pfType func([]string, sarama.KafkaVersion, config.Retry) Producer + +// cfType defines the signature of the consumer constructor. +type cfType func([]string, sarama.KafkaVersion, ChainPartition, int64) (Consumer, error) + +// bfValue holds the value for the broker constructor that's used in the non-test case. +var bfValue = func(brokers []string, cp ChainPartition) (Broker, error) { + return newBroker(brokers, cp) +} + +// pfValue holds the value for the producer constructor that's used in the non-test case. +var pfValue = func(brokers []string, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry) Producer { + return newProducer(brokers, kafkaVersion, retryOptions) +} + +// cfValue holds the value for the consumer constructor that's used in the non-test case. +var cfValue = func(brokers []string, kafkaVersion sarama.KafkaVersion, cp ChainPartition, offset int64) (Consumer, error) { + return newConsumer(brokers, kafkaVersion, cp, offset) +} + +// consenterImpl holds the implementation of type that satisfies the +// multichain.Consenter and testableConsenter interfaces. The former +// is needed because that is what the HandleChain contract requires. +// The latter is needed for testing. +type consenterImpl struct { + kv sarama.KafkaVersion + ro config.Retry + bf bfType + pf pfType + cf cfType +} + +// HandleChain creates/returns a reference to a Chain for the given set of support resources. +// Implements the multichain.Consenter interface. Called by multichain.newChainSupport(), which +// is itself called by multichain.NewManagerImpl() when ranging over the ledgerFactory's existingChains. +func (co *consenterImpl) HandleChain(cs multichain.ConsenterSupport) (multichain.Chain, error) { + return newChain(co, cs), nil +} + +// When testing we need to inject our own broker/producer/consumer. +// Therefore we need to (a) hold a reference to an object that stores +// the broker/producer/consumer constructors, and (b) refer to that +// object via its interface type, so that we can use a different +// implementation when testing. This, in turn, calls for (c) —- the +// definition of an interface (see testableConsenter below) that will +// be satisfied by both the actual and the mock object and will allow +// us to retrieve these constructors. +func newChain(consenter testableConsenter, support multichain.ConsenterSupport) *chainImpl { + return &chainImpl{ + consenter: consenter, + support: support, + partition: newChainPartition(support.ChainID(), rawPartition), + lastProcessed: sarama.OffsetOldest - 1, // TODO This should be retrieved by ConsenterSupport; also see note in loop() below + producer: consenter.prodFunc()(support.SharedConfig().KafkaBrokers(), consenter.kafkaVersion(), consenter.retryOptions()), + halted: false, // Redundant as the default value for booleans is false but added for readability + exitChan: make(chan struct{}), + haltedChan: make(chan struct{}), + } +} + +// Satisfied by both chainImpl consenterImpl and mockConsenterImpl. +// Defined so as to facilitate testing. +type testableConsenter interface { + kafkaVersion() sarama.KafkaVersion + retryOptions() config.Retry + brokFunc() bfType + prodFunc() pfType + consFunc() cfType +} + +func (co *consenterImpl) kafkaVersion() sarama.KafkaVersion { return co.kv } +func (co *consenterImpl) retryOptions() config.Retry { return co.ro } +func (co *consenterImpl) brokFunc() bfType { return co.bf } +func (co *consenterImpl) prodFunc() pfType { return co.pf } +func (co *consenterImpl) consFunc() cfType { return co.cf } + +type chainImpl struct { + consenter testableConsenter + support multichain.ConsenterSupport + + partition ChainPartition + lastProcessed int64 + + producer Producer + consumer Consumer + + halted bool // For the Enqueue() calls + exitChan chan struct{} // For the Chain's Halt() method + + haltedChan chan struct{} // Hook for testing +} + +// Start allocates the necessary resources for staying up to date with this Chain. +// Implements the multichain.Chain interface. Called by multichain.NewManagerImpl() +// which is invoked when the ordering process is launched, before the call to NewServer(). +func (ch *chainImpl) Start() { + // 1. Post the CONNECT message to prevent panicking that occurs + // when seeking on a partition that hasn't been created yet. + logger.Debug("Posting the CONNECT message...") + if err := ch.producer.Send(ch.partition, utils.MarshalOrPanic(newConnectMessage())); err != nil { + logger.Criticalf("Couldn't post CONNECT message to %s: %s", ch.partition, err) + close(ch.exitChan) + ch.halted = true + return + } + + // 2. Set up the listener/consumer for this partition. + // TODO When restart support gets added to the common components level, start + // the consumer from lastProcessed. For now, hard-code to oldest available. + consumer, err := ch.consenter.consFunc()(ch.support.SharedConfig().KafkaBrokers(), ch.consenter.kafkaVersion(), ch.partition, ch.lastProcessed+1) + if err != nil { + logger.Criticalf("Cannot retrieve required offset from Kafka cluster for chain %s: %s", ch.partition, err) + close(ch.exitChan) + ch.halted = true + return + } + ch.consumer = consumer + + // 3. Set the loop the keep up to date with the chain. + go ch.loop() +} + +// Halt frees the resources which were allocated for this Chain. +// Implements the multichain.Chain interface. +func (ch *chainImpl) Halt() { + select { + case <-ch.exitChan: + // This construct is useful because it allows Halt() to be + // called multiple times w/o panicking. Recal that a receive + // from a closed channel returns (the zero value) immediately. + default: + close(ch.exitChan) + } +} + +// Enqueue accepts a message and returns true on acceptance, or false on shutdown. +// Implements the multichain.Chain interface. Called by the drainQueue goroutine, +// which is spawned when the broadcast handler's Handle() function is invoked. +func (ch *chainImpl) Enqueue(env *cb.Envelope) bool { + if ch.halted { + return false + } + + logger.Debug("Enqueueing:", env) + if err := ch.producer.Send(ch.partition, utils.MarshalOrPanic(newRegularMessage(utils.MarshalOrPanic(env)))); err != nil { + logger.Errorf("Couldn't post to %s: %s", ch.partition, err) + return false + } + + return !ch.halted // If ch.halted has been set to true while sending, we should return false +} + +func (ch *chainImpl) loop() { + msg := new(ab.KafkaMessage) + + defer close(ch.haltedChan) + defer ch.producer.Close() + defer func() { ch.halted = true }() + defer ch.consumer.Close() + + // TODO Add support for time-based block cutting + + for { + select { + case in := <-ch.consumer.Recv(): + logger.Debug("Received:", in) + if err := proto.Unmarshal(in.Value, msg); err != nil { + // This shouldn't happen, it should be filtered at ingress + logger.Critical("Unable to unmarshal consumed message:", err) + } + logger.Debug("Unmarshaled to:", msg) + switch msg.Type.(type) { + case *ab.KafkaMessage_Connect, *ab.KafkaMessage_TimeToCut: + logger.Debugf("Ignoring message") + continue + case *ab.KafkaMessage_Regular: + env := new(cb.Envelope) + if err := proto.Unmarshal(msg.GetRegular().Payload, env); err != nil { + // This shouldn't happen, it should be filtered at ingress + logger.Critical("Unable to unmarshal consumed message:", err) + continue + } + batches, committers, ok := ch.support.BlockCutter().Ordered(env) + logger.Debugf("Ordering results: batches: %v, ok: %v", batches, ok) + if ok && len(batches) == 0 { + continue + } + // If !ok, batches == nil, so this will be skipped + for i, batch := range batches { + ch.support.WriteBlock(batch, nil, committers[i]) + } + } + case <-ch.exitChan: // when Halt() is called + logger.Infof("Consenter for chain %s exiting", ch.partition.Topic()) + return + } + } +} + +// Closeable allows the shut down of the calling resource. +type Closeable interface { + Close() error +} diff --git a/orderer/kafka/main_test.go b/orderer/kafka/main_test.go new file mode 100644 index 00000000000..c1b773d0616 --- /dev/null +++ b/orderer/kafka/main_test.go @@ -0,0 +1,215 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kafka + +import ( + "sync" + "testing" + "time" + + "github.com/Shopify/sarama" + "github.com/hyperledger/fabric/orderer/common/bootstrap/provisional" + "github.com/hyperledger/fabric/orderer/localconfig" + mockblockcutter "github.com/hyperledger/fabric/orderer/mocks/blockcutter" + mockmultichain "github.com/hyperledger/fabric/orderer/mocks/multichain" + mocksharedconfig "github.com/hyperledger/fabric/orderer/mocks/sharedconfig" + "github.com/hyperledger/fabric/orderer/multichain" + cb "github.com/hyperledger/fabric/protos/common" + ab "github.com/hyperledger/fabric/protos/orderer" +) + +var cp = newChainPartition(provisional.TestChainID, rawPartition) + +func newMockSharedConfigManager() *mocksharedconfig.Manager { + return &mocksharedconfig.Manager{KafkaBrokersVal: testConf.Kafka.Brokers} +} + +func syncQueueMessage(msg *cb.Envelope, chain multichain.Chain, bc *mockblockcutter.Receiver) { + chain.Enqueue(msg) + bc.Block <- struct{}{} +} + +type mockConsenterImpl struct { + consenterImpl + prodDisk, consDisk chan *ab.KafkaMessage + consumerSetUp bool + t *testing.T +} + +func mockNewConsenter(t *testing.T, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry) *mockConsenterImpl { + prodDisk := make(chan *ab.KafkaMessage) + consDisk := make(chan *ab.KafkaMessage) + + mockBfValue := func(brokers []string, cp ChainPartition) (Broker, error) { + return mockNewBroker(t, cp) + } + mockPfValue := func(brokers []string, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry) Producer { + return mockNewProducer(t, cp, testOldestOffset, prodDisk) + } + mockCfValue := func(brokers []string, kafkaVersion sarama.KafkaVersion, cp ChainPartition, offset int64) (Consumer, error) { + return mockNewConsumer(t, cp, offset, consDisk) + } + + return &mockConsenterImpl{ + consenterImpl: consenterImpl{ + kv: kafkaVersion, + ro: retryOptions, + bf: mockBfValue, + pf: mockPfValue, + cf: mockCfValue, + }, + prodDisk: prodDisk, + consDisk: consDisk, + t: t, + } +} + +func TestKafkaConsenterEmptyBatch(t *testing.T) { + var wg sync.WaitGroup + defer wg.Wait() + + cs := &mockmultichain.ConsenterSupport{ + Batches: make(chan []*cb.Envelope), + BlockCutterVal: mockblockcutter.NewReceiver(), + ChainIDVal: provisional.TestChainID, + SharedConfigVal: newMockSharedConfigManager(), + } + defer close(cs.BlockCutterVal.Block) + + co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry) + ch := newChain(co, cs) + ch.lastProcessed = testOldestOffset - 1 + + go ch.Start() + defer ch.Halt() + + wg.Add(1) + go func() { + defer wg.Done() + // Wait until the mock producer is done before messing around with its disk + select { + case <-ch.producer.(*mockProducerImpl).isSetup: + // Dispense the CONNECT message that is posted with Start() + <-co.prodDisk + case <-time.After(testTimePadding): + t.Fatal("Mock producer not setup in time") + } + }() + wg.Wait() + + wg.Add(1) + go func() { + defer wg.Done() + // Pick up the message that will be posted via the syncQueueMessage/Enqueue call below + msg := <-co.prodDisk + // Place it to the right location so that the mockConsumer can read it + co.consDisk <- msg + }() + + syncQueueMessage(newTestEnvelope("one"), ch, cs.BlockCutterVal) + // The message has already been moved to the consumer's disk, + // otherwise syncQueueMessage wouldn't return, so the Wait() + // here is unnecessary but let's be paranoid. + wg.Wait() + + // Stop the loop + ch.Halt() + + select { + case <-cs.Batches: + t.Fatal("Expected no invocations of Append") + case <-ch.haltedChan: // If we're here, we definitely had a chance to invoke Append but didn't (which is great) + } +} + +func TestKafkaConsenterConfigStyleMultiBatch(t *testing.T) { + var wg sync.WaitGroup + defer wg.Wait() + + cs := &mockmultichain.ConsenterSupport{ + Batches: make(chan []*cb.Envelope), + BlockCutterVal: mockblockcutter.NewReceiver(), + ChainIDVal: provisional.TestChainID, + SharedConfigVal: newMockSharedConfigManager(), + } + defer close(cs.BlockCutterVal.Block) + + co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry) + ch := newChain(co, cs) + ch.lastProcessed = testOldestOffset - 1 + + go ch.Start() + defer ch.Halt() + + wg.Add(1) + go func() { + defer wg.Done() + // Wait until the mock producer is done before messing around with its disk + select { + case <-ch.producer.(*mockProducerImpl).isSetup: + // Dispense the CONNECT message that is posted with Start() + <-co.prodDisk + case <-time.After(testTimePadding): + t.Fatal("Mock producer not setup in time") + } + }() + wg.Wait() + + wg.Add(1) + go func() { + defer wg.Done() + // Pick up the message that will be posted via the syncQueueMessage/Enqueue call below + msg := <-co.prodDisk + // Place it to the right location so that the mockConsumer can read it + co.consDisk <- msg + }() + syncQueueMessage(newTestEnvelope("one"), ch, cs.BlockCutterVal) + wg.Wait() + + cs.BlockCutterVal.IsolatedTx = true + + wg.Add(1) + go func() { + defer wg.Done() + // Pick up the message that will be posted via the syncQueueMessage/Enqueue call below + msg := <-co.prodDisk + // Place it to the right location so that the mockConsumer can read it + co.consDisk <- msg + }() + syncQueueMessage(newTestEnvelope("two"), ch, cs.BlockCutterVal) + wg.Wait() + + ch.Halt() + + select { + case <-cs.Batches: + case <-time.After(testTimePadding): + t.Fatal("Expected two blocks to be cut but never got the first") + } + + select { + case <-cs.Batches: + case <-time.After(testTimePadding): + t.Fatal("Expected the config type tx to create two blocks, but only got the first") + } + + select { + case <-time.After(testTimePadding): + t.Fatal("Should have exited") + case <-ch.haltedChan: + } +} diff --git a/orderer/kafka/orderer.go b/orderer/kafka/orderer.go deleted file mode 100644 index b67428ee6af..00000000000 --- a/orderer/kafka/orderer.go +++ /dev/null @@ -1,63 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "github.com/hyperledger/fabric/orderer/localconfig" - ab "github.com/hyperledger/fabric/protos/orderer" -) - -// Orderer allows the caller to submit to and receive messages from the orderer -type Orderer interface { - Broadcast(stream ab.AtomicBroadcast_BroadcastServer) error - Deliver(stream ab.AtomicBroadcast_DeliverServer) error - Teardown() error -} - -// Closeable allows the shut down of the calling resource -type Closeable interface { - Close() error -} - -type serverImpl struct { - broadcaster Broadcaster - deliverer Deliverer -} - -// New creates a new orderer -func New(conf *config.TopLevel) Orderer { - return &serverImpl{ - broadcaster: newBroadcaster(conf), - deliverer: newDeliverer(conf), - } -} - -// Broadcast submits messages for ordering -func (s *serverImpl) Broadcast(stream ab.AtomicBroadcast_BroadcastServer) error { - return s.broadcaster.Broadcast(stream) -} - -// Deliver returns a stream of ordered messages -func (s *serverImpl) Deliver(stream ab.AtomicBroadcast_DeliverServer) error { - return s.deliverer.Deliver(stream) -} - -// Teardown shuts down the orderer -func (s *serverImpl) Teardown() error { - s.deliverer.Close() - return s.broadcaster.Close() -} diff --git a/orderer/kafka/orderer_mock_test.go b/orderer/kafka/orderer_mock_test.go deleted file mode 100644 index 129cec0caaf..00000000000 --- a/orderer/kafka/orderer_mock_test.go +++ /dev/null @@ -1,100 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "testing" - - "github.com/hyperledger/fabric/orderer/localconfig" - cb "github.com/hyperledger/fabric/protos/common" - ab "github.com/hyperledger/fabric/protos/orderer" - "google.golang.org/grpc" -) - -func mockNew(t *testing.T, conf *config.TopLevel, disk chan []byte) Orderer { - return &serverImpl{ - broadcaster: mockNewBroadcaster(t, conf, testOldestOffset, disk), - deliverer: mockNewDeliverer(t, conf), - } -} - -type mockBroadcastStream struct { - grpc.ServerStream - incoming chan *cb.Envelope - outgoing chan *ab.BroadcastResponse - t *testing.T - closed bool // Set to true if the outgoing channel is closed -} - -func newMockBroadcastStream(t *testing.T) *mockBroadcastStream { - return &mockBroadcastStream{ - incoming: make(chan *cb.Envelope), - outgoing: make(chan *ab.BroadcastResponse), - t: t, - } -} - -func (mbs *mockBroadcastStream) Recv() (*cb.Envelope, error) { - return <-mbs.incoming, nil -} - -func (mbs *mockBroadcastStream) Send(reply *ab.BroadcastResponse) error { - if !mbs.closed { - mbs.outgoing <- reply - } - return nil -} - -func (mbs *mockBroadcastStream) CloseOut() bool { - close(mbs.outgoing) - mbs.closed = true - return mbs.closed -} - -type mockDeliverStream struct { - grpc.ServerStream - incoming chan *ab.DeliverUpdate - outgoing chan *ab.DeliverResponse - t *testing.T - closed bool -} - -func newMockDeliverStream(t *testing.T) *mockDeliverStream { - return &mockDeliverStream{ - incoming: make(chan *ab.DeliverUpdate), - outgoing: make(chan *ab.DeliverResponse), - t: t, - } -} - -func (mds *mockDeliverStream) Recv() (*ab.DeliverUpdate, error) { - return <-mds.incoming, nil - -} - -func (mds *mockDeliverStream) Send(reply *ab.DeliverResponse) error { - if !mds.closed { - mds.outgoing <- reply - } - return nil -} - -func (mds *mockDeliverStream) CloseOut() bool { - close(mds.outgoing) - mds.closed = true - return mds.closed -} diff --git a/orderer/kafka/producer.go b/orderer/kafka/producer.go index 2e46c332d59..3cc3a727c4a 100644 --- a/orderer/kafka/producer.go +++ b/orderer/kafka/producer.go @@ -24,24 +24,23 @@ import ( "github.com/hyperledger/fabric/orderer/localconfig" ) -// Producer allows the caller to send blocks to the orderer +// Producer allows the caller to post blobs to a chain partition on the Kafka cluster. type Producer interface { - Send(payload []byte) error + Send(cp ChainPartition, payload []byte) error Closeable } type producerImpl struct { producer sarama.SyncProducer - topic string } -func newProducer(conf *config.TopLevel) Producer { - brokerConfig := newBrokerConfig(conf) +func newProducer(brokers []string, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry) Producer { var p sarama.SyncProducer var err error + brokerConfig := newBrokerConfig(kafkaVersion, rawPartition) - repeatTick := time.NewTicker(conf.Kafka.Retry.Period) - panicTick := time.NewTicker(conf.Kafka.Retry.Stop) + repeatTick := time.NewTicker(retryOptions.Period) + panicTick := time.NewTicker(retryOptions.Stop) defer repeatTick.Stop() defer panicTick.Stop() @@ -51,28 +50,34 @@ loop: case <-panicTick.C: panic(fmt.Errorf("Failed to create Kafka producer: %v", err)) case <-repeatTick.C: - logger.Debug("Connecting to Kafka brokers:", conf.Kafka.Brokers) - p, err = sarama.NewSyncProducer(conf.Kafka.Brokers, brokerConfig) + logger.Debug("Connecting to Kafka cluster:", brokers) + p, err = sarama.NewSyncProducer(brokers, brokerConfig) if err == nil { break loop } } } - logger.Debug("Connected to Kafka brokers") - return &producerImpl{producer: p, topic: conf.Kafka.Topic} + logger.Debug("Connected to the Kafka cluster") + return &producerImpl{producer: p} } +// Close shuts down the Producer component of the orderer. func (p *producerImpl) Close() error { return p.producer.Close() } -func (p *producerImpl) Send(payload []byte) error { - _, offset, err := p.producer.SendMessage(newMsg(payload, p.topic)) +// Send posts a blob to a chain partition on the Kafka cluster. +func (p *producerImpl) Send(cp ChainPartition, payload []byte) error { + prt, ofs, err := p.producer.SendMessage(newProducerMessage(cp, payload)) + if prt != cp.Partition() { + // If this happens, something's up with the partitioner + logger.Warningf("Blob destined for partition %d, but posted to %d instead", cp.Partition(), prt) + } if err == nil { - logger.Debugf("Forwarded block %v to ordering service", offset) + logger.Debugf("Forwarded blob with offset number %d to chain partition %s on the Kafka cluster", ofs, cp) } else { - logger.Info("Failed to send to Kafka brokers:", err) + logger.Infof("Failed to send message to chain partition %s on the Kafka cluster: %s", cp, err) } return err } diff --git a/orderer/kafka/producer_mock_test.go b/orderer/kafka/producer_mock_test.go index b567661cb57..d07069092c2 100644 --- a/orderer/kafka/producer_mock_test.go +++ b/orderer/kafka/producer_mock_test.go @@ -21,46 +21,59 @@ import ( "testing" "github.com/Shopify/sarama/mocks" - "github.com/hyperledger/fabric/orderer/localconfig" + "github.com/golang/protobuf/proto" + ab "github.com/hyperledger/fabric/protos/orderer" + "github.com/hyperledger/fabric/protos/utils" ) type mockProducerImpl struct { - config *config.TopLevel producer *mocks.SyncProducer + checker mocks.ValueChecker - checker mocks.ValueChecker - disk chan []byte // This simulates the broker's "disk" where the producer's messages eventually end up + // This simulates the broker's "disk" where the producer's + // blobs for a certain chain partition eventually end up. + disk chan *ab.KafkaMessage producedOffset int64 + isSetup chan struct{} t *testing.T } -func mockNewProducer(t *testing.T, conf *config.TopLevel, seek int64, disk chan []byte) Producer { +// Create a new producer whose next "Send" on ChainPartition gives you blob #offset. +func mockNewProducer(t *testing.T, cp ChainPartition, offset int64, disk chan *ab.KafkaMessage) Producer { mp := &mockProducerImpl{ - config: conf, producer: mocks.NewSyncProducer(t, nil), checker: nil, disk: disk, producedOffset: 0, + isSetup: make(chan struct{}), t: t, } - if seek >= testOldestOffset && seek <= (testNewestOffset-1) { - mp.testFillWithBlocks(seek - 1) // Prepare the producer so that the next Send gives you block "seek" + mp.init(cp, offset) + + if mp.producedOffset == offset-1 { + close(mp.isSetup) } else { - panic(fmt.Errorf("Out of range seek number given to producer")) + mp.t.Fatal("Mock producer failed to initialize itself properly") } + return mp } -func (mp *mockProducerImpl) Send(payload []byte) error { +func (mp *mockProducerImpl) Send(cp ChainPartition, payload []byte) error { mp.producer.ExpectSendMessageWithCheckerFunctionAndSucceed(mp.checker) - mp.producedOffset++ - prt, ofs, err := mp.producer.SendMessage(newMsg(payload, mp.config.Kafka.Topic)) - if err != nil || - prt != mp.config.Kafka.PartitionID || - ofs != mp.producedOffset { - mp.t.Fatal("Producer not functioning as expected") + mp.producedOffset++ // This is the offset that will be assigned to the sent message + _, ofs, err := mp.producer.SendMessage(newProducerMessage(cp, payload)) + // We do NOT check the assigned partition because the mock + // producer always posts to partition 0 no matter what. + // This is a deficiency of the Kafka library that we use. + if err != nil || ofs != mp.producedOffset { + mp.t.Fatal("Mock producer not functioning as expected") } - mp.disk <- payload // Reaches the broker's disk + msg := new(ab.KafkaMessage) + if err := proto.Unmarshal(payload, msg); err != nil { + mp.t.Fatalf("Failed to unmarshal message that reached producer's disk: %s", err) + } + mp.disk <- msg // Reaches the cluster's disk for that chain partition return err } @@ -68,26 +81,38 @@ func (mp *mockProducerImpl) Close() error { return mp.producer.Close() } -func (mp *mockProducerImpl) testFillWithBlocks(seek int64) { - dyingChan := make(chan struct{}) +// Initializes the mock producer by setting up the offsets. +func (mp *mockProducerImpl) init(cp ChainPartition, offset int64) { + if offset >= testOldestOffset && offset <= (testNewestOffset-1) { + // Prepare the producer so that the next Send + // on that chain partition gives you blob #offset. + mp.testFillWithBlocks(cp, offset-1) + } else { + panic(fmt.Errorf("Out of range offset (seek number) given to producer: %d", offset)) + } +} + +func (mp *mockProducerImpl) testFillWithBlocks(cp ChainPartition, offset int64) { + dieChan := make(chan struct{}) deadChan := make(chan struct{}) go func() { // This goroutine is meant to read only the "fill-in" blocks for { select { case <-mp.disk: - case <-dyingChan: + case <-dieChan: close(deadChan) return } } }() - for i := int64(1); i <= seek; i++ { - mp.Send([]byte("fill-in")) + for i := int64(1); i <= offset; i++ { + mp.Send(cp, utils.MarshalOrPanic(newRegularMessage(utils.MarshalOrPanic(newTestEnvelope(fmt.Sprintf("producer fill-in %d", i)))))) } - close(dyingChan) + close(dieChan) <-deadChan + return } diff --git a/orderer/kafka/producer_test.go b/orderer/kafka/producer_test.go index 2ec33cbbd8f..7d649cee660 100644 --- a/orderer/kafka/producer_test.go +++ b/orderer/kafka/producer_test.go @@ -16,9 +16,24 @@ limitations under the License. package kafka -import "testing" +import ( + "testing" -func TestProducer(t *testing.T) { - mp := mockNewProducer(t, testConf, testMiddleOffset, make(chan []byte)) + "github.com/hyperledger/fabric/orderer/common/bootstrap/provisional" + ab "github.com/hyperledger/fabric/protos/orderer" + "github.com/hyperledger/fabric/protos/utils" +) + +func TestProducerSend(t *testing.T) { + cp := newChainPartition(provisional.TestChainID, rawPartition) + mp := mockNewProducer(t, cp, testMiddleOffset, make(chan *ab.KafkaMessage)) defer testClose(t, mp) + + go func() { + <-mp.(*mockProducerImpl).disk // Retrieve the message that we'll be sending below + }() + + if err := mp.Send(cp, utils.MarshalOrPanic(newRegularMessage([]byte("foo")))); err != nil { + t.Fatalf("Mock producer was not initialized correctly: %s", err) + } } diff --git a/orderer/kafka/util.go b/orderer/kafka/util.go index ad31b5021f7..a20b6283926 100644 --- a/orderer/kafka/util.go +++ b/orderer/kafka/util.go @@ -17,24 +17,21 @@ limitations under the License. package kafka import ( + "strconv" + "github.com/Shopify/sarama" - "github.com/hyperledger/fabric/orderer/localconfig" ab "github.com/hyperledger/fabric/protos/orderer" ) -const ( - ackOutOfRangeError = "ACK out of range" - seekOutOfRangeError = "Seek out of range" - windowOutOfRangeError = "Window out of range" -) - -func newBrokerConfig(conf *config.TopLevel) *sarama.Config { +// TODO Set the returned config file to more appropriate +// defaults as we're getting closer to a stable release +func newBrokerConfig(kafkaVersion sarama.KafkaVersion, chosenStaticPartition int32) *sarama.Config { brokerConfig := sarama.NewConfig() - brokerConfig.Version = conf.Kafka.Version + brokerConfig.Version = kafkaVersion // A partitioner is actually not needed the way we do things now, // but we're adding it now to allow for flexibility in the future. - brokerConfig.Producer.Partitioner = newStaticPartitioner(conf.Kafka.PartitionID) + brokerConfig.Producer.Partitioner = newStaticPartitioner(chosenStaticPartition) // Set equivalent of kafka producer config max.request.bytes to the deafult // value of a Kafka broker's socket.request.max.bytes property (100 MiB). brokerConfig.Producer.MaxMessageBytes = int(sarama.MaxRequestSize) @@ -72,20 +69,21 @@ func newTimeToCutMessage(blockNumber uint64) *ab.KafkaMessage { } } -func newMsg(payload []byte, topic string) *sarama.ProducerMessage { +func newProducerMessage(cp ChainPartition, payload []byte) *sarama.ProducerMessage { return &sarama.ProducerMessage{ - Topic: topic, + Topic: cp.Topic(), + Key: sarama.StringEncoder(strconv.Itoa(int(cp.Partition()))), // TODO Consider writing an IntEncoder? Value: sarama.ByteEncoder(payload), } } -func newOffsetReq(conf *config.TopLevel, seek int64) *sarama.OffsetRequest { +func newOffsetReq(cp ChainPartition, offset int64) *sarama.OffsetRequest { req := &sarama.OffsetRequest{} - // If seek == -1, ask for the for the offset assigned to next new message - // If seek == -2, ask for the earliest available offset + // If offset (seek) == -1, ask for the offset assigned to next new message. + // If offset (seek) == -2, ask for the earliest available offset. // The last parameter in the AddBlock call is needed for God-knows-why reasons. // From the Kafka folks themselves: "We agree that this API is slightly funky." // https://mail-archives.apache.org/mod_mbox/kafka-users/201411.mbox/%3Cc159383825e04129b77253ffd6c448aa@BY2PR02MB505.namprd02.prod.outlook.com%3E - req.AddBlock(conf.Kafka.Topic, conf.Kafka.PartitionID, seek, 1) + req.AddBlock(cp.Topic(), cp.Partition(), offset, 1) return req } diff --git a/orderer/kafka/util_test.go b/orderer/kafka/util_test.go index eddae299888..6a8a45713dc 100644 --- a/orderer/kafka/util_test.go +++ b/orderer/kafka/util_test.go @@ -20,25 +20,31 @@ import ( "testing" "github.com/Shopify/sarama" + "github.com/hyperledger/fabric/orderer/common/bootstrap/provisional" ) func TestProducerConfigMessageMaxBytes(t *testing.T) { + cp := newChainPartition(provisional.TestChainID, rawPartition) - topic := testConf.Kafka.Topic - - broker := sarama.NewMockBroker(t, 1000) + broker := sarama.NewMockBroker(t, 1) + defer func() { + broker.Close() + }() broker.SetHandlerByMap(map[string]sarama.MockResponse{ "MetadataRequest": sarama.NewMockMetadataResponse(t). SetBroker(broker.Addr(), broker.BrokerID()). - SetLeader(topic, 0, broker.BrokerID()), + SetLeader(cp.Topic(), cp.Partition(), broker.BrokerID()), "ProduceRequest": sarama.NewMockProduceResponse(t), }) - config := newBrokerConfig(testConf) + config := newBrokerConfig(testConf.Kafka.Version, rawPartition) producer, err := sarama.NewSyncProducer([]string{broker.Addr()}, config) if err != nil { t.Fatal(err) } + defer func() { + producer.Close() + }() testCases := []struct { name string @@ -51,56 +57,53 @@ func TestProducerConfigMessageMaxBytes(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - _, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: topic, Value: sarama.ByteEncoder(make([]byte, tc.size))}) + _, _, err = producer.SendMessage(&sarama.ProducerMessage{ + Topic: cp.Topic(), + Value: sarama.ByteEncoder(make([]byte, tc.size)), + }) if err != tc.err { t.Fatal(err) } }) } - - producer.Close() - broker.Close() } func TestNewBrokerConfig(t *testing.T) { + // Use a partition ID that is not the 'default' (rawPartition) + var differentPartition int32 = 2 + cp = newChainPartition(provisional.TestChainID, differentPartition) - topic := testConf.Kafka.Topic - - // use a partition id that is not the 'default' 0 - var partition int32 = 2 - originalPartitionID := testConf.Kafka.PartitionID + // Setup a mock broker that reports that it has 3 partitions for the topic + broker := sarama.NewMockBroker(t, 1) defer func() { - testConf.Kafka.PartitionID = originalPartitionID + broker.Close() }() - testConf.Kafka.PartitionID = partition - - // setup a mock broker that reports that it has 3 partitions for the topic - broker := sarama.NewMockBroker(t, 1000) broker.SetHandlerByMap(map[string]sarama.MockResponse{ "MetadataRequest": sarama.NewMockMetadataResponse(t). SetBroker(broker.Addr(), broker.BrokerID()). - SetLeader(topic, 0, broker.BrokerID()). - SetLeader(topic, 1, broker.BrokerID()). - SetLeader(topic, 2, broker.BrokerID()), + SetLeader(cp.Topic(), 0, broker.BrokerID()). + SetLeader(cp.Topic(), 1, broker.BrokerID()). + SetLeader(cp.Topic(), 2, broker.BrokerID()), "ProduceRequest": sarama.NewMockProduceResponse(t), }) - config := newBrokerConfig(testConf) + config := newBrokerConfig(testConf.Kafka.Version, differentPartition) producer, err := sarama.NewSyncProducer([]string{broker.Addr()}, config) if err != nil { - t.Fatal(err) + t.Fatal("Failed to create producer:", err) } + defer func() { + producer.Close() + }() for i := 0; i < 10; i++ { - assignedPartition, _, err := producer.SendMessage(&sarama.ProducerMessage{Topic: topic}) + assignedPartition, _, err := producer.SendMessage(&sarama.ProducerMessage{Topic: cp.Topic()}) if err != nil { - t.Fatal(err) + t.Fatal("Failed to send message:", err) } - if assignedPartition != partition { - t.Fatalf("Expected: %v. Actual: %v", partition, assignedPartition) + if assignedPartition != differentPartition { + t.Fatalf("Message wasn't posted to the right partition - expected: %d, got %v", differentPartition, assignedPartition) } } - producer.Close() - broker.Close() } diff --git a/orderer/localconfig/config.go b/orderer/localconfig/config.go index 7be925fe91d..50bd29ec139 100644 --- a/orderer/localconfig/config.go +++ b/orderer/localconfig/config.go @@ -70,12 +70,10 @@ type FileLedger struct { // Kafka contains config for the Kafka orderer type Kafka struct { - Brokers []string // TODO This should be deprecated and this information should be stored in the config block - Topic string - PartitionID int32 - Retry Retry - Verbose bool - Version sarama.KafkaVersion + Brokers []string // TODO This should be deprecated and this information should be stored in the config block + Retry Retry + Verbose bool + Version sarama.KafkaVersion } // Retry contains config for the reconnection attempts to the Kafka brokers @@ -120,14 +118,13 @@ var defaults = TopLevel{ Prefix: "hyperledger-fabric-rawledger", }, Kafka: Kafka{ - Brokers: []string{"127.0.0.1:9092"}, - Topic: "test", - PartitionID: 0, - Version: sarama.V0_9_0_1, + Brokers: []string{"127.0.0.1:9092"}, Retry: Retry{ Period: 3 * time.Second, Stop: 60 * time.Second, }, + Verbose: false, + Version: sarama.V0_9_0_1, }, } @@ -171,9 +168,6 @@ func (c *TopLevel) completeInitialization() { case c.Kafka.Brokers == nil: logger.Infof("Kafka.Brokers unset, setting to %v", defaults.Kafka.Brokers) c.Kafka.Brokers = defaults.Kafka.Brokers - case c.Kafka.Topic == "": - logger.Infof("Kafka.Topic unset, setting to %v", defaults.Kafka.Topic) - c.Kafka.Topic = defaults.Kafka.Topic case c.Kafka.Retry.Period == 0*time.Second: logger.Infof("Kafka.Retry.Period unset, setting to %v", defaults.Kafka.Retry.Period) c.Kafka.Retry.Period = defaults.Kafka.Retry.Period diff --git a/orderer/main.go b/orderer/main.go index d06327f09b7..f7051f8687c 100644 --- a/orderer/main.go +++ b/orderer/main.go @@ -24,8 +24,8 @@ import ( "net/http" _ "net/http/pprof" "os" - "os/signal" + "github.com/Shopify/sarama" "github.com/hyperledger/fabric/orderer/common/bootstrap/provisional" "github.com/hyperledger/fabric/orderer/kafka" "github.com/hyperledger/fabric/orderer/localconfig" @@ -37,18 +37,21 @@ import ( cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" - "github.com/Shopify/sarama" "github.com/op/go-logging" "google.golang.org/grpc" ) var logger = logging.MustGetLogger("orderer/main") +func init() { + logging.SetLevel(logging.DEBUG, "") +} + func main() { conf := config.Load() - // Start the profiling service if enabled. The ListenAndServe() - // call does not return unless an error occurs. + // Start the profiling service if enabled. + // The ListenAndServe() call does not return unless an error occurs. if conf.General.Profile.Enabled { go func() { logger.Infof("Starting Go pprof profiling service on %s", conf.General.Profile.Address) @@ -56,21 +59,6 @@ func main() { }() } - switch conf.General.OrdererType { - case "solo": - launchGeneric(conf) - case "kafka": - launchKafka(conf) - default: - panic("Invalid orderer type specified in config") - } -} - -func init() { - logging.SetLevel(logging.DEBUG, "") -} - -func launchGeneric(conf *config.TopLevel) { grpcServer := grpc.NewServer() lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", conf.General.ListenAddress, conf.General.ListenPort)) @@ -100,7 +88,6 @@ func launchGeneric(conf *config.TopLevel) { panic(fmt.Errorf("Error creating temp dir: %s", err)) } } - lf, _ = fileledger.New(location, genesisBlock) case "ram": fallthrough @@ -108,8 +95,13 @@ func launchGeneric(conf *config.TopLevel) { lf, _ = ramledger.New(int(conf.RAMLedger.HistorySize), genesisBlock) } + if conf.Kafka.Verbose { + sarama.Logger = log.New(os.Stdout, "[sarama] ", log.Lshortfile) + } + consenters := make(map[string]multichain.Consenter) consenters["solo"] = solo.New(conf.General.BatchTimeout) + consenters["kafka"] = kafka.New(conf.Kafka.Version, conf.Kafka.Retry) manager := multichain.NewManagerImpl(lf, consenters) @@ -122,34 +114,3 @@ func launchGeneric(conf *config.TopLevel) { ab.RegisterAtomicBroadcastServer(grpcServer, server) grpcServer.Serve(lis) } - -func launchKafka(conf *config.TopLevel) { - var kafkaVersion = sarama.V0_9_0_1 // TODO Ideally we'd set this in the YAML file but its type makes this impossible - conf.Kafka.Version = kafkaVersion - - if conf.Kafka.Verbose { - sarama.Logger = log.New(os.Stdout, "[sarama] ", log.Lshortfile) - } - - ordererSrv := kafka.New(conf) - defer ordererSrv.Teardown() - - lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", conf.General.ListenAddress, conf.General.ListenPort)) - if err != nil { - panic(err) - } - rpcSrv := grpc.NewServer() // TODO Add TLS support - ab.RegisterAtomicBroadcastServer(rpcSrv, ordererSrv) - go rpcSrv.Serve(lis) - - // Trap SIGINT to trigger a shutdown - // We must use a buffered channel or risk missing the signal - // if we're not ready to receive when the signal is sent. - signalChan := make(chan os.Signal, 1) - signal.Notify(signalChan, os.Interrupt) - - for range signalChan { - logger.Info("Server shutting down") - return - } -} diff --git a/orderer/orderer.yaml b/orderer/orderer.yaml index 685b76e95ad..e219ac122ba 100644 --- a/orderer/orderer.yaml +++ b/orderer/orderer.yaml @@ -14,8 +14,7 @@ General: OrdererType: solo # Ledger Type: The ledger type to provide to the orderer (if needed) - # Available types are "ram", "file". When "kafka" is chosen as the - # OrdererType, this option is ignored. + # Available types are "ram", "file". LedgerType: ram # Batch Timeout: The amount of time to wait before creating a batch @@ -25,7 +24,7 @@ General: BatchSize: 10 # Queue Size: The maximum number of messages to allow pending from a gRPC - # client. This option is currently ignored for the Kafka OrdererType. + # client. QueueSize: 10 # Max Window Size: The maximum number of messages to for the orderer Deliver @@ -92,13 +91,6 @@ Kafka: Brokers: - 127.0.0.1:9092 - # Topic: The Kafka topic the orderer writes to/reads from - Topic: test - - # Partition ID: The partition of the Kafka topic the orderer writes to/reads - # from - PartitionID: 0 - # Retry: What to do if none of the Kafka brokers are available Retry: # The producer should attempt to reconnect every @@ -106,5 +98,6 @@ Kafka: # Panic if has elapsed and no connection has been established Stop: 60s - # Verbose: Turn on logging for the Kafka library + # Verbose: Turn on logging for sarama, the client library that we use to + # interact with the Kafka cluster Verbose: false