Skip to content

Commit

Permalink
drop stopBlock
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun committed Oct 24, 2023
1 parent 573448c commit 5810654
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 43 deletions.
5 changes: 0 additions & 5 deletions pulsar/message_chunking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,14 +560,11 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) {
mm.TotalChunkMsgSize = proto.Int32(int32(len(wholePayload)))
mm.ChunkId = proto.Int32(int32(chunkID))
producerImpl.updateMetadataSeqID(mm, msg)

doneCh := make(chan struct{})
producerImpl.internalSingleSend(
mm,
msg.Payload,
&sendRequest{
callback: func(id MessageID, producerMessage *ProducerMessage, err error) {
close(doneCh)
},
ctx: context.Background(),
msg: msg,
Expand All @@ -592,6 +589,4 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) {
},
uint32(internal.MaxMessageSize),
)

<-doneCh
}
41 changes: 3 additions & 38 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,6 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
// read payload from message
uncompressedPayload := msg.Payload

// The block chan must be closed when returned with exception
defer sr.stopBlock()
if !p.canAddToQueue(sr) {
return
}
Expand Down Expand Up @@ -526,9 +524,6 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
}

if sr.totalChunks <= 1 {
// close the blockCh when totalChunks is 1 (it has acquired permits)
// TODO - drop this method
sr.stopBlock()
p.internalSingleSend(sr.mm, sr.compressedPayload, sr, uint32(sr.maxMessageSize))
return
}
Expand All @@ -553,8 +548,6 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
callback: sr.callback,
callbackOnce: sr.callbackOnce,
publishTime: sr.publishTime,
blockCh: sr.blockCh,
closeBlockChOnce: sr.closeBlockChOnce,
totalChunks: sr.totalChunks,
chunkID: chunkID,
uuid: uuid,
Expand All @@ -581,10 +574,6 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {

p.internalSingleSend(sr.mm, sr.compressedPayload[lhs:rhs], nsr, uint32(sr.maxMessageSize))
}

// close the blockCh when all the chunks acquired permits
// TODO - drop this method
sr.stopBlock()
}

func addRequestToBatch(smm *pb.SingleMessageMetadata, p *partitionProducer,
Expand Down Expand Up @@ -1125,11 +1114,7 @@ func (p *partitionProducer) updateMetaData(sr *sendRequest) {
sr.msg.ReplicationClusters == nil &&
deliverAt.UnixNano() < 0

// Once the batching is enabled, it can close blockCh early to make block finish
if sr.sendAsBatch {
// TODO - drop this method
sr.stopBlock()
} else {
if !sr.sendAsBatch {
// update sequence id for metadata, make the size of msgMetadata more accurate
// batch sending will update sequence ID in the BatchBuilder
p.updateMetadataSeqID(sr.mm, sr.msg)
Expand All @@ -1153,7 +1138,7 @@ func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error {
checkSize = int64(sr.compressedSize)
}

sr.maxMessageSize = int32(int64(p._getConn().GetMaxMessageSize()))
sr.maxMessageSize = p._getConn().GetMaxMessageSize()

// if msg is too large and chunking is disabled
if checkSize > int64(sr.maxMessageSize) && !p.options.EnableChunking {
Expand Down Expand Up @@ -1202,19 +1187,13 @@ func (p *partitionProducer) internalSendAsync(
return
}

// bc only works when DisableBlockIfQueueFull is false
bc := make(chan struct{})
// callbackOnce make sure the callback is only invoked once in chunking
callbackOnce := &sync.Once{}
sr := &sendRequest{
ctx: ctx,
msg: msg,
callback: callback,
callbackOnce: callbackOnce,
callbackOnce: &sync.Once{},
flushImmediately: flushImmediately,
publishTime: time.Now(),
blockCh: bc,
closeBlockChOnce: &sync.Once{},
}
if err := p.prepareTransaction(sr); err != nil {
runCallback(sr.callback, nil, msg, err)
Expand Down Expand Up @@ -1250,11 +1229,6 @@ func (p *partitionProducer) internalSendAsync(
}

p.dataChan <- sr

if !p.options.DisableBlockIfQueueFull {
// block if queue full
<-bc
}
}

func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
Expand Down Expand Up @@ -1488,8 +1462,6 @@ type sendRequest struct {
callbackOnce *sync.Once
publishTime time.Time
flushImmediately bool
blockCh chan struct{}
closeBlockChOnce *sync.Once
totalChunks int
chunkID int
uuid string
Expand All @@ -1509,13 +1481,6 @@ type sendRequest struct {
maxMessageSize int32
}

// stopBlock can be invoked multiple times safety
func (sr *sendRequest) stopBlock() {
sr.closeBlockChOnce.Do(func() {
close(sr.blockCh)
})
}

type closeProducer struct {
doneCh chan struct{}
}
Expand Down

0 comments on commit 5810654

Please sign in to comment.