Skip to content

Commit

Permalink
locate bug
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun committed Oct 24, 2023
1 parent 5810654 commit 15135bf
Showing 1 changed file with 33 additions and 41 deletions.
74 changes: 33 additions & 41 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,35 +483,38 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {

msg := sr.msg

// read payload from message
uncompressedPayload := msg.Payload

if !p.canAddToQueue(sr) {
return
}

uncompressedSize := sr.uncompressedSize

// try to reserve memory for uncompressedPayload
if !p.canReserveMem(sr, uncompressedSize) {
if !p.canReserveMem(sr, sr.uncompressedSize) {
return
}

if err := p.updateChunkInfo(sr); err != nil {
p.releaseSemaphoreAndMem(sr.uncompressedSize)
runCallback(sr.callback, nil, sr.msg, err)
p.metrics.PublishErrorsMsgTooLarge.Inc()
return
}

if sr.sendAsBatch {
smm := p.genSingleMessageMetadataInBatch(msg, int(uncompressedSize))
smm := p.genSingleMessageMetadataInBatch(msg, int(sr.uncompressedSize))
multiSchemaEnabled := !p.options.DisableMultiSchema
added := addRequestToBatch(
smm, p, uncompressedPayload, sr, msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled)
added := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, msg, sr.deliverAt, sr.schemaVersion,
multiSchemaEnabled)
if !added {
// The current batch is full. flush it and retry

p.internalFlushCurrentBatch()

// after flushing try again to add the current payload
if ok := addRequestToBatch(smm, p, uncompressedPayload, sr, msg,
sr.deliverAt, sr.schemaVersion, multiSchemaEnabled); !ok {
p.releaseSemaphoreAndMem(uncompressedSize)
if ok := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, msg, sr.deliverAt, sr.schemaVersion,
multiSchemaEnabled); !ok {
p.releaseSemaphoreAndMem(sr.uncompressedSize)
runCallback(sr.callback, nil, sr.msg, errFailAddToBatch)
p.log.WithField("size", uncompressedSize).
p.log.WithField("size", sr.uncompressedSize).
WithField("properties", msg.Properties).
Error("unable to add message to batch")
return
Expand All @@ -536,8 +539,7 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
cr := newChunkRecorder()
for chunkID := 0; chunkID < sr.totalChunks; chunkID++ {
lhs = chunkID * sr.payloadChunkSize
rhs = lhs + sr.payloadChunkSize
if rhs > sr.compressedSize {
if rhs = lhs + sr.payloadChunkSize; rhs > sr.compressedSize {
rhs = sr.compressedSize
}
// update chunk id
Expand All @@ -548,6 +550,7 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
callback: sr.callback,
callbackOnce: sr.callbackOnce,
publishTime: sr.publishTime,
flushImmediately: sr.flushImmediately,
totalChunks: sr.totalChunks,
chunkID: chunkID,
uuid: uuid,
Expand All @@ -568,10 +571,9 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
}
// the permit of first chunk has acquired
if chunkID != 0 && !p.canAddToQueue(nsr) {
p.releaseSemaphoreAndMem(uncompressedSize - int64(lhs))
p.releaseSemaphoreAndMem(sr.uncompressedSize - int64(lhs))
return
}

p.internalSingleSend(sr.mm, sr.compressedPayload[lhs:rhs], nsr, uint32(sr.maxMessageSize))
}
}
Expand Down Expand Up @@ -1146,32 +1148,28 @@ func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error {
WithField("size", checkSize).
WithField("properties", sr.msg.Properties).
Errorf("MaxMessageSize %d", sr.maxMessageSize)

return errMessageTooLarge
}

if sr.sendAsBatch || !p.options.EnableChunking {
sr.totalChunks = 1
sr.payloadChunkSize = int(sr.maxMessageSize)
return nil
}

sr.payloadChunkSize = int(sr.maxMessageSize) - proto.Size(sr.mm)
if sr.payloadChunkSize <= 0 {
p.log.WithError(errMetaTooLarge).
WithField("metadata size", proto.Size(sr.mm)).
WithField("properties", sr.msg.Properties).
Errorf("MaxMessageSize %d", sr.maxMessageSize)

return errMetaTooLarge
}

// set ChunkMaxMessageSize
if p.options.ChunkMaxMessageSize != 0 {
sr.payloadChunkSize = int(math.Min(float64(sr.payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
} else {
sr.payloadChunkSize = int(sr.maxMessageSize) - proto.Size(sr.mm)
if sr.payloadChunkSize <= 0 {
p.log.WithError(errMetaTooLarge).
WithField("metadata size", proto.Size(sr.mm)).
WithField("properties", sr.msg.Properties).
Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
return errMetaTooLarge
}
// set ChunkMaxMessageSize
if p.options.ChunkMaxMessageSize != 0 {
sr.payloadChunkSize = int(math.Min(float64(sr.payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
}
sr.totalChunks = int(math.Max(1, math.Ceil(float64(sr.compressedSize)/float64(sr.payloadChunkSize))))
}

sr.totalChunks = int(math.Max(1, math.Ceil(float64(sr.compressedSize)/float64(sr.payloadChunkSize))))
return nil
}

Expand Down Expand Up @@ -1222,12 +1220,6 @@ func (p *partitionProducer) internalSendAsync(

p.updateMetaData(sr)

if err := p.updateChunkInfo(sr); err != nil {
p.log.Error(err)
runCallback(sr.callback, nil, msg, err)
return
}

p.dataChan <- sr
}

Expand Down

0 comments on commit 15135bf

Please sign in to comment.