diff --git a/orderer/kafka/util.go b/orderer/kafka/util.go index a20b6283926..b59f6214928 100644 --- a/orderer/kafka/util.go +++ b/orderer/kafka/util.go @@ -29,6 +29,10 @@ func newBrokerConfig(kafkaVersion sarama.KafkaVersion, chosenStaticPartition int brokerConfig := sarama.NewConfig() brokerConfig.Version = kafkaVersion + // Set the level of acknowledgement reliability needed from the broker. + // WaitForAll means that the partition leader will wait till all ISRs + // got the message before sending back an ACK to the sender. + brokerConfig.Producer.RequiredAcks = sarama.WaitForAll // A partitioner is actually not needed the way we do things now, // but we're adding it now to allow for flexibility in the future. brokerConfig.Producer.Partitioner = newStaticPartitioner(chosenStaticPartition)