From f7b6387bfbe7b33923aeb00e91cc043537c340f5 Mon Sep 17 00:00:00 2001 From: cornelk Date: Sun, 12 Apr 2020 17:02:42 +0300 Subject: [PATCH] Fix data race when accessing partition producer state --- pulsar/producer_partition.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 1241d47323..d917ed26b7 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -31,17 +31,16 @@ import ( "github.com/apache/pulsar-client-go/pulsar/internal/pb" ) -type producerState int - const ( - producerInit producerState = iota + // producer states + producerInit int32 = iota producerReady producerClosing producerClosed ) type partitionProducer struct { - state producerState + state int32 client *client topic string log *log.Entry @@ -107,7 +106,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions p.log = p.log.WithField("producer_name", p.producerName) p.log.WithField("cnx", p.cnx.ID()).Info("Created producer") - p.state = producerReady + atomic.StoreInt32(&p.state, producerReady) go p.runEventsLoop() @@ -181,7 +180,7 @@ func (p *partitionProducer) ConnectionClosed() { func (p *partitionProducer) reconnectToBroker() { backoff := internal.Backoff{} for { - if p.state != producerReady { + if atomic.LoadInt32(&p.state) != producerReady { // Producer is already closing return } @@ -432,11 +431,10 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) func (p *partitionProducer) internalClose(req *closeProducer) { defer req.waitGroup.Done() - if p.state != producerReady { + if !atomic.CompareAndSwapInt32(&p.state, producerReady, producerClosing) { return } - p.state = producerClosing p.log.Info("Closing producer") id := p.client.rpcClient.NewRequestID() @@ -451,7 +449,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) { p.log.Info("Closed producer") } - p.state = producerClosed + atomic.StoreInt32(&p.state, producerClosed) p.cnx.UnregisterListener(p.producerID) p.batchFlushTicker.Stop() } @@ -472,7 +470,7 @@ func (p *partitionProducer) Flush() error { } func (p *partitionProducer) Close() { - if p.state != producerReady { + if atomic.LoadInt32(&p.state) != producerReady { // Producer is closing return }