@@ -9,6 +9,7 @@ package kafka
9
9
import (
10
10
"fmt"
11
11
"strconv"
12
+ "sync"
12
13
"time"
13
14
14
15
"github.com/Shopify/sarama"
@@ -93,6 +94,8 @@ type chainImpl struct {
93
94
parentConsumer sarama.Consumer
94
95
channelConsumer sarama.PartitionConsumer
95
96
97
+ // mutex used when changing the doneReprocessingMsgInFlight
98
+ doneReprocessingMutex sync.Mutex
96
99
// notification that there are in-flight messages need to wait for
97
100
doneReprocessingMsgInFlight chan struct {}
98
101
@@ -161,15 +164,32 @@ func (chain *chainImpl) WaitReady() error {
161
164
select {
162
165
case <- chain .haltChan : // The chain has been halted, stop here
163
166
return fmt .Errorf ("consenter for this channel has been halted" )
164
- // Block waiting for all re-submitted messages to be reprocessed
165
- case <- chain .doneReprocessingMsgInFlight :
167
+ case <- chain .doneReprocessing (): // Block waiting for all re-submitted messages to be reprocessed
166
168
return nil
167
169
}
168
170
default : // Not ready yet
169
171
return fmt .Errorf ("will not enqueue, consenter for this channel hasn't started yet" )
170
172
}
171
173
}
172
174
175
+ func (chain * chainImpl ) doneReprocessing () <- chan struct {} {
176
+ chain .doneReprocessingMutex .Lock ()
177
+ defer chain .doneReprocessingMutex .Unlock ()
178
+ return chain .doneReprocessingMsgInFlight
179
+ }
180
+
181
+ func (chain * chainImpl ) reprocessConfigComplete () {
182
+ chain .doneReprocessingMutex .Lock ()
183
+ defer chain .doneReprocessingMutex .Unlock ()
184
+ close (chain .doneReprocessingMsgInFlight )
185
+ }
186
+
187
+ func (chain * chainImpl ) reprocessConfigPending () {
188
+ chain .doneReprocessingMutex .Lock ()
189
+ defer chain .doneReprocessingMutex .Unlock ()
190
+ chain .doneReprocessingMsgInFlight = make (chan struct {})
191
+ }
192
+
173
193
// Implements the consensus.Chain interface. Called by Broadcast().
174
194
func (chain * chainImpl ) Order (env * cb.Envelope , configSeq uint64 ) error {
175
195
return chain .order (env , configSeq , int64 (0 ))
@@ -771,7 +791,7 @@ func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, r
771
791
regularMessage .ConfigSeq == seq { // AND we don't need to resubmit it again
772
792
logger .Debugf ("[channel: %s] Config message with original offset %d is the last in-flight resubmitted message" +
773
793
"and it does not require revalidation, unblock ingress messages now" , chain .ChainID (), regularMessage .OriginalOffset )
774
- close ( chain .doneReprocessingMsgInFlight ) // Therefore, we could finally close the channel to unblock broadcast
794
+ chain .reprocessConfigComplete ( ) // Therefore, we could finally unblock broadcast
775
795
}
776
796
777
797
// Somebody resubmitted message at offset X, whereas we didn't. This is due to non-determinism where
@@ -798,8 +818,8 @@ func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, r
798
818
}
799
819
800
820
logger .Debugf ("[channel: %s] Resubmitted config message with offset %d, block ingress messages" , chain .ChainID (), receivedOffset )
801
- chain .lastResubmittedConfigOffset = receivedOffset // Keep track of last resubmitted message offset
802
- chain .doneReprocessingMsgInFlight = make ( chan struct {}) // Create the channel to block ingress messages
821
+ chain .lastResubmittedConfigOffset = receivedOffset // Keep track of last resubmitted message offset
822
+ chain .reprocessConfigPending () // Begin blocking ingress messages
803
823
804
824
return nil
805
825
}
0 commit comments