From 58106540c7b743ca6a319a1dcf83facc8559d0f9 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 25 Oct 2023 00:16:55 +0800 Subject: [PATCH] drop stopBlock Signed-off-by: tison --- pulsar/message_chunking_test.go | 5 ---- pulsar/producer_partition.go | 41 +++------------------------------ 2 files changed, 3 insertions(+), 43 deletions(-) diff --git a/pulsar/message_chunking_test.go b/pulsar/message_chunking_test.go index cb7190d6f4..fbdcaa0ceb 100644 --- a/pulsar/message_chunking_test.go +++ b/pulsar/message_chunking_test.go @@ -560,14 +560,11 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) { mm.TotalChunkMsgSize = proto.Int32(int32(len(wholePayload))) mm.ChunkId = proto.Int32(int32(chunkID)) producerImpl.updateMetadataSeqID(mm, msg) - - doneCh := make(chan struct{}) producerImpl.internalSingleSend( mm, msg.Payload, &sendRequest{ callback: func(id MessageID, producerMessage *ProducerMessage, err error) { - close(doneCh) }, ctx: context.Background(), msg: msg, @@ -592,6 +589,4 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) { }, uint32(internal.MaxMessageSize), ) - - <-doneCh } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 7f4a910481..e19431401c 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -486,8 +486,6 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { // read payload from message uncompressedPayload := msg.Payload - // The block chan must be closed when returned with exception - defer sr.stopBlock() if !p.canAddToQueue(sr) { return } @@ -526,9 +524,6 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { } if sr.totalChunks <= 1 { - // close the blockCh when totalChunks is 1 (it has acquired permits) - // TODO - drop this method - sr.stopBlock() p.internalSingleSend(sr.mm, sr.compressedPayload, sr, uint32(sr.maxMessageSize)) return } @@ -553,8 +548,6 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { callback: sr.callback, callbackOnce: sr.callbackOnce, publishTime: sr.publishTime, - blockCh: sr.blockCh, - closeBlockChOnce: sr.closeBlockChOnce, totalChunks: sr.totalChunks, chunkID: chunkID, uuid: uuid, @@ -581,10 +574,6 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { p.internalSingleSend(sr.mm, sr.compressedPayload[lhs:rhs], nsr, uint32(sr.maxMessageSize)) } - - // close the blockCh when all the chunks acquired permits - // TODO - drop this method - sr.stopBlock() } func addRequestToBatch(smm *pb.SingleMessageMetadata, p *partitionProducer, @@ -1125,11 +1114,7 @@ func (p *partitionProducer) updateMetaData(sr *sendRequest) { sr.msg.ReplicationClusters == nil && deliverAt.UnixNano() < 0 - // Once the batching is enabled, it can close blockCh early to make block finish - if sr.sendAsBatch { - // TODO - drop this method - sr.stopBlock() - } else { + if !sr.sendAsBatch { // update sequence id for metadata, make the size of msgMetadata more accurate // batch sending will update sequence ID in the BatchBuilder p.updateMetadataSeqID(sr.mm, sr.msg) @@ -1153,7 +1138,7 @@ func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error { checkSize = int64(sr.compressedSize) } - sr.maxMessageSize = int32(int64(p._getConn().GetMaxMessageSize())) + sr.maxMessageSize = p._getConn().GetMaxMessageSize() // if msg is too large and chunking is disabled if checkSize > int64(sr.maxMessageSize) && !p.options.EnableChunking { @@ -1202,19 +1187,13 @@ func (p *partitionProducer) internalSendAsync( return } - // bc only works when DisableBlockIfQueueFull is false - bc := make(chan struct{}) - // callbackOnce make sure the callback is only invoked once in chunking - callbackOnce := &sync.Once{} sr := &sendRequest{ ctx: ctx, msg: msg, callback: callback, - callbackOnce: callbackOnce, + callbackOnce: &sync.Once{}, flushImmediately: flushImmediately, publishTime: time.Now(), - blockCh: bc, - closeBlockChOnce: &sync.Once{}, } if err := p.prepareTransaction(sr); err != nil { runCallback(sr.callback, nil, msg, err) @@ -1250,11 +1229,6 @@ func (p *partitionProducer) internalSendAsync( } p.dataChan <- sr - - if !p.options.DisableBlockIfQueueFull { - // block if queue full - <-bc - } } func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) { @@ -1488,8 +1462,6 @@ type sendRequest struct { callbackOnce *sync.Once publishTime time.Time flushImmediately bool - blockCh chan struct{} - closeBlockChOnce *sync.Once totalChunks int chunkID int uuid string @@ -1509,13 +1481,6 @@ type sendRequest struct { maxMessageSize int32 } -// stopBlock can be invoked multiple times safety -func (sr *sendRequest) stopBlock() { - sr.closeBlockChOnce.Do(func() { - close(sr.blockCh) - }) -} - type closeProducer struct { doneCh chan struct{} }