From 573448c195c4969d98ba93763310533c2f82243a Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 25 Oct 2023 00:12:44 +0800 Subject: [PATCH] refactor: prepare sendrequest and move to internalSendAsync Signed-off-by: tison --- pulsar/message_chunking_test.go | 26 +- pulsar/producer_partition.go | 466 ++++++++++++++++++-------------- 2 files changed, 285 insertions(+), 207 deletions(-) diff --git a/pulsar/message_chunking_test.go b/pulsar/message_chunking_test.go index ee3ab1776..cb7190d6f 100644 --- a/pulsar/message_chunking_test.go +++ b/pulsar/message_chunking_test.go @@ -552,11 +552,12 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) { msg := &ProducerMessage{ Payload: []byte(fmt.Sprintf("chunk-%s-%d|", uuid, chunkID)), } + wholePayload := msg.Payload producerImpl := p.(*producer).producers[0].(*partitionProducer) - mm := producerImpl.genMetadata(msg, len(msg.Payload), time.Now()) + mm := producerImpl.genMetadata(msg, len(wholePayload), time.Now()) mm.Uuid = proto.String(uuid) mm.NumChunksFromMsg = proto.Int32(int32(totalChunks)) - mm.TotalChunkMsgSize = proto.Int32(int32(len(msg.Payload))) + mm.TotalChunkMsgSize = proto.Int32(int32(len(wholePayload))) mm.ChunkId = proto.Int32(int32(chunkID)) producerImpl.updateMetadataSeqID(mm, msg) @@ -568,7 +569,26 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) { callback: func(id MessageID, producerMessage *ProducerMessage, err error) { close(doneCh) }, - msg: msg, + ctx: context.Background(), + msg: msg, + flushImmediately: true, + totalChunks: totalChunks, + chunkID: chunkID, + uuid: uuid, + chunkRecorder: newChunkRecorder(), + transaction: nil, + reservedMem: 0, + sendAsBatch: false, + schema: nil, + schemaVersion: nil, + uncompressedPayload: wholePayload, + uncompressedSize: int64(len(wholePayload)), + compressedPayload: wholePayload, + compressedSize: len(wholePayload), + payloadChunkSize: internal.MaxMessageSize - proto.Size(mm), + mm: mm, + deliverAt: time.Now(), + maxMessageSize: internal.MaxMessageSize, }, uint32(internal.MaxMessageSize), ) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index ac07341d3..7f4a91048 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -478,224 +478,113 @@ func runCallback(cb func(MessageID, *ProducerMessage, error), id MessageID, msg cb(id, msg, err) } -func (p *partitionProducer) internalSend(request *sendRequest) { - p.log.Debug("Received send request: ", *request.msg) +func (p *partitionProducer) internalSend(sr *sendRequest) { + p.log.Debug("Received send request: ", *sr.msg) - msg := request.msg + msg := sr.msg // read payload from message uncompressedPayload := msg.Payload - var schemaPayload []byte - var err error - // The block chan must be closed when returned with exception - defer request.stopBlock() - if !p.canAddToQueue(request) { + defer sr.stopBlock() + if !p.canAddToQueue(sr) { return } - var schema Schema - var schemaVersion []byte - if msg.Schema != nil { - schema = msg.Schema - } else if p.options.Schema != nil { - schema = p.options.Schema - } - if msg.Value != nil { - // payload and schema are mutually exclusive - // try to get payload from schema value only if payload is not set - if uncompressedPayload == nil && schema != nil { - schemaPayload, err = schema.Encode(msg.Value) - if err != nil { - runCallback(request.callback, nil, request.msg, newError(SchemaFailure, err.Error())) - p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value) - return - } - } - } - if uncompressedPayload == nil { - uncompressedPayload = schemaPayload - } - - if schema != nil { - schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo()) - if schemaVersion == nil { - schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo()) - if err != nil { - p.log.WithError(err).Error("get schema version fail") - runCallback(request.callback, nil, request.msg, fmt.Errorf("get schema version fail, err: %w", err)) - return - } - p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion) - } - } - - uncompressedSize := len(uncompressedPayload) + uncompressedSize := sr.uncompressedSize // try to reserve memory for uncompressedPayload - if !p.canReserveMem(request, int64(uncompressedSize)) { - return - } - - deliverAt := msg.DeliverAt - if msg.DeliverAfter.Nanoseconds() > 0 { - deliverAt = time.Now().Add(msg.DeliverAfter) - } - - // set default ReplicationClusters when DisableReplication - if msg.DisableReplication { - msg.ReplicationClusters = []string{"__local__"} - } - - mm := p.genMetadata(msg, uncompressedSize, deliverAt) - - sendAsBatch := !p.options.DisableBatching && - msg.ReplicationClusters == nil && - deliverAt.UnixNano() < 0 - - // Once the batching is enabled, it can close blockCh early to make block finish - if sendAsBatch { - request.stopBlock() - } else { - // update sequence id for metadata, make the size of msgMetadata more accurate - // batch sending will update sequence ID in the BatchBuilder - p.updateMetadataSeqID(mm, msg) - } - - maxMessageSize := int(p._getConn().GetMaxMessageSize()) - - // compress payload if not batching - var compressedPayload []byte - var compressedSize int - var checkSize int - if !sendAsBatch { - compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload) - compressedSize = len(compressedPayload) - checkSize = compressedSize - - // set the compress type in msgMetaData - compressionType := pb.CompressionType(p.options.CompressionType) - if compressionType != pb.CompressionType_NONE { - mm.Compression = &compressionType - } - } else { - // final check for batching message is in serializeMessage - // this is a double check - checkSize = uncompressedSize - } - - // if msg is too large and chunking is disabled - if checkSize > maxMessageSize && !p.options.EnableChunking { - p.releaseSemaphoreAndMem(int64(uncompressedSize)) - runCallback(request.callback, nil, request.msg, errMessageTooLarge) - p.log.WithError(errMessageTooLarge). - WithField("size", checkSize). - WithField("properties", msg.Properties). - Errorf("MaxMessageSize %d", maxMessageSize) - p.metrics.PublishErrorsMsgTooLarge.Inc() + if !p.canReserveMem(sr, uncompressedSize) { return } - var totalChunks int - // max chunk payload size - var payloadChunkSize int - if sendAsBatch || !p.options.EnableChunking { - totalChunks = 1 - payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - } else { - payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - proto.Size(mm) - if payloadChunkSize <= 0 { - p.releaseSemaphoreAndMem(int64(uncompressedSize)) - runCallback(request.callback, nil, msg, errMetaTooLarge) - p.log.WithError(errMetaTooLarge). - WithField("metadata size", proto.Size(mm)). - WithField("properties", msg.Properties). - Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize())) - p.metrics.PublishErrorsMsgTooLarge.Inc() - return - } - // set ChunkMaxMessageSize - if p.options.ChunkMaxMessageSize != 0 { - payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize))) - } - totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize)))) - } - - // set total chunks to send request - request.totalChunks = totalChunks - - if !sendAsBatch { - if totalChunks > 1 { - var lhs, rhs int - uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*mm.SequenceId, 10)) - mm.Uuid = proto.String(uuid) - mm.NumChunksFromMsg = proto.Int32(int32(totalChunks)) - mm.TotalChunkMsgSize = proto.Int32(int32(compressedSize)) - cr := newChunkRecorder() - for chunkID := 0; chunkID < totalChunks; chunkID++ { - lhs = chunkID * payloadChunkSize - if rhs = lhs + payloadChunkSize; rhs > compressedSize { - rhs = compressedSize - } - // update chunk id - mm.ChunkId = proto.Int32(int32(chunkID)) - nsr := &sendRequest{ - ctx: request.ctx, - msg: request.msg, - callback: request.callback, - callbackOnce: request.callbackOnce, - publishTime: request.publishTime, - blockCh: request.blockCh, - closeBlockChOnce: request.closeBlockChOnce, - totalChunks: totalChunks, - chunkID: chunkID, - uuid: uuid, - chunkRecorder: cr, - transaction: request.transaction, - reservedMem: int64(rhs - lhs), - } - // the permit of first chunk has acquired - if chunkID != 0 && !p.canAddToQueue(nsr) { - p.releaseSemaphoreAndMem(int64(uncompressedSize - lhs)) - return - } - p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize)) - } - // close the blockCh when all the chunks acquired permits - request.stopBlock() - } else { - // close the blockCh when totalChunks is 1 (it has acquired permits) - request.stopBlock() - p.internalSingleSend(mm, compressedPayload, request, uint32(maxMessageSize)) - } - } else { - smm := p.genSingleMessageMetadataInBatch(msg, uncompressedSize) + if sr.sendAsBatch { + smm := p.genSingleMessageMetadataInBatch(msg, int(uncompressedSize)) multiSchemaEnabled := !p.options.DisableMultiSchema - added := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion, - multiSchemaEnabled) + added := addRequestToBatch( + smm, p, uncompressedPayload, sr, msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled) if !added { // The current batch is full. flush it and retry - p.internalFlushCurrentBatch() // after flushing try again to add the current payload - if ok := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion, - multiSchemaEnabled); !ok { - p.releaseSemaphoreAndMem(int64(uncompressedSize)) - runCallback(request.callback, nil, request.msg, errFailAddToBatch) + if ok := addRequestToBatch(smm, p, uncompressedPayload, sr, msg, + sr.deliverAt, sr.schemaVersion, multiSchemaEnabled); !ok { + p.releaseSemaphoreAndMem(uncompressedSize) + runCallback(sr.callback, nil, sr.msg, errFailAddToBatch) p.log.WithField("size", uncompressedSize). WithField("properties", msg.Properties). Error("unable to add message to batch") return } } - if request.flushImmediately { - + if sr.flushImmediately { p.internalFlushCurrentBatch() + } + return + } + if sr.totalChunks <= 1 { + // close the blockCh when totalChunks is 1 (it has acquired permits) + // TODO - drop this method + sr.stopBlock() + p.internalSingleSend(sr.mm, sr.compressedPayload, sr, uint32(sr.maxMessageSize)) + return + } + + var lhs, rhs int + uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*sr.mm.SequenceId, 10)) + sr.mm.Uuid = proto.String(uuid) + sr.mm.NumChunksFromMsg = proto.Int32(int32(sr.totalChunks)) + sr.mm.TotalChunkMsgSize = proto.Int32(int32(sr.compressedSize)) + cr := newChunkRecorder() + for chunkID := 0; chunkID < sr.totalChunks; chunkID++ { + lhs = chunkID * sr.payloadChunkSize + rhs = lhs + sr.payloadChunkSize + if rhs > sr.compressedSize { + rhs = sr.compressedSize + } + // update chunk id + sr.mm.ChunkId = proto.Int32(int32(chunkID)) + nsr := &sendRequest{ + ctx: sr.ctx, + msg: sr.msg, + callback: sr.callback, + callbackOnce: sr.callbackOnce, + publishTime: sr.publishTime, + blockCh: sr.blockCh, + closeBlockChOnce: sr.closeBlockChOnce, + totalChunks: sr.totalChunks, + chunkID: chunkID, + uuid: uuid, + chunkRecorder: cr, + transaction: sr.transaction, + reservedMem: int64(rhs - lhs), + sendAsBatch: sr.sendAsBatch, + schema: sr.schema, + schemaVersion: sr.schemaVersion, + uncompressedPayload: sr.uncompressedPayload, + uncompressedSize: sr.uncompressedSize, + compressedPayload: sr.compressedPayload, + compressedSize: sr.compressedSize, + payloadChunkSize: sr.payloadChunkSize, + mm: sr.mm, + deliverAt: sr.deliverAt, + maxMessageSize: sr.maxMessageSize, } + // the permit of first chunk has acquired + if chunkID != 0 && !p.canAddToQueue(nsr) { + p.releaseSemaphoreAndMem(uncompressedSize - int64(lhs)) + return + } + + p.internalSingleSend(sr.mm, sr.compressedPayload[lhs:rhs], nsr, uint32(sr.maxMessageSize)) } + + // close the blockCh when all the chunks acquired permits + // TODO - drop this method + sr.stopBlock() } func addRequestToBatch(smm *pb.SingleMessageMetadata, p *partitionProducer, @@ -766,8 +655,10 @@ func (p *partitionProducer) updateSingleMessageMetadataSeqID(smm *pb.SingleMessa } } -func (p *partitionProducer) genSingleMessageMetadataInBatch(msg *ProducerMessage, - uncompressedSize int) (smm *pb.SingleMessageMetadata) { +func (p *partitionProducer) genSingleMessageMetadataInBatch( + msg *ProducerMessage, + uncompressedSize int, +) (smm *pb.SingleMessageMetadata) { smm = &pb.SingleMessageMetadata{ PayloadSize: proto.Int32(int32(uncompressedSize)), } @@ -1163,6 +1054,142 @@ func (p *partitionProducer) prepareTransaction(sr *sendRequest) error { return nil } +func (p *partitionProducer) updateSchema(sr *sendRequest) error { + var schema Schema + var schemaVersion []byte + var err error + + if sr.msg.Schema != nil { + schema = sr.msg.Schema + } else if p.options.Schema != nil { + schema = p.options.Schema + } + + if schema == nil { + return nil + } + + schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo()) + if schemaVersion == nil { + schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo()) + if err != nil { + return fmt.Errorf("get schema version fail, err: %w", err) + } + p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion) + } + + sr.schema = schema + sr.schemaVersion = schemaVersion + return nil +} + +func (p *partitionProducer) updateUncompressedPayload(sr *sendRequest) error { + // read payload from message + sr.uncompressedPayload = sr.msg.Payload + + if sr.msg.Value != nil { + if sr.schema == nil { + p.log.Errorf("Schema encode message failed %s", sr.msg.Value) + return newError(SchemaFailure, "set schema value without setting schema") + } + + // payload and schema are mutually exclusive + // try to get payload from schema value only if payload is not set + schemaPayload, err := sr.schema.Encode(sr.msg.Value) + if err != nil { + p.log.WithError(err).Errorf("Schema encode message failed %s", sr.msg.Value) + return newError(SchemaFailure, err.Error()) + } + + sr.uncompressedPayload = schemaPayload + } + + sr.uncompressedSize = int64(len(sr.uncompressedPayload)) + return nil +} + +func (p *partitionProducer) updateMetaData(sr *sendRequest) { + deliverAt := sr.msg.DeliverAt + if sr.msg.DeliverAfter.Nanoseconds() > 0 { + deliverAt = time.Now().Add(sr.msg.DeliverAfter) + } + + // set default ReplicationClusters when DisableReplication + if sr.msg.DisableReplication { + sr.msg.ReplicationClusters = []string{"__local__"} + } + + sr.mm = p.genMetadata(sr.msg, int(sr.uncompressedSize), deliverAt) + + sr.sendAsBatch = !p.options.DisableBatching && + sr.msg.ReplicationClusters == nil && + deliverAt.UnixNano() < 0 + + // Once the batching is enabled, it can close blockCh early to make block finish + if sr.sendAsBatch { + // TODO - drop this method + sr.stopBlock() + } else { + // update sequence id for metadata, make the size of msgMetadata more accurate + // batch sending will update sequence ID in the BatchBuilder + p.updateMetadataSeqID(sr.mm, sr.msg) + } + + sr.deliverAt = deliverAt +} + +func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error { + checkSize := sr.uncompressedSize + if !sr.sendAsBatch { + sr.compressedPayload = p.compressionProvider.Compress(nil, sr.uncompressedPayload) + sr.compressedSize = len(sr.compressedPayload) + + // set the compress type in msgMetaData + compressionType := pb.CompressionType(p.options.CompressionType) + if compressionType != pb.CompressionType_NONE { + sr.mm.Compression = &compressionType + } + + checkSize = int64(sr.compressedSize) + } + + sr.maxMessageSize = int32(int64(p._getConn().GetMaxMessageSize())) + + // if msg is too large and chunking is disabled + if checkSize > int64(sr.maxMessageSize) && !p.options.EnableChunking { + p.log.WithError(errMessageTooLarge). + WithField("size", checkSize). + WithField("properties", sr.msg.Properties). + Errorf("MaxMessageSize %d", sr.maxMessageSize) + + return errMessageTooLarge + } + + if sr.sendAsBatch || !p.options.EnableChunking { + sr.totalChunks = 1 + sr.payloadChunkSize = int(sr.maxMessageSize) + return nil + } + + sr.payloadChunkSize = int(sr.maxMessageSize) - proto.Size(sr.mm) + if sr.payloadChunkSize <= 0 { + p.log.WithError(errMetaTooLarge). + WithField("metadata size", proto.Size(sr.mm)). + WithField("properties", sr.msg.Properties). + Errorf("MaxMessageSize %d", sr.maxMessageSize) + + return errMetaTooLarge + } + + // set ChunkMaxMessageSize + if p.options.ChunkMaxMessageSize != 0 { + sr.payloadChunkSize = int(math.Min(float64(sr.payloadChunkSize), float64(p.options.ChunkMaxMessageSize))) + } + + sr.totalChunks = int(math.Max(1, math.Ceil(float64(sr.compressedSize)/float64(sr.payloadChunkSize)))) + return nil +} + func (p *partitionProducer) internalSendAsync( ctx context.Context, msg *ProducerMessage, @@ -1202,6 +1229,26 @@ func (p *partitionProducer) internalSendAsync( p.options.Interceptors.BeforeSend(p, msg) + if err := p.updateSchema(sr); err != nil { + p.log.Error(err) + runCallback(sr.callback, nil, msg, err) + return + } + + if err := p.updateUncompressedPayload(sr); err != nil { + p.log.Error(err) + runCallback(sr.callback, nil, msg, err) + return + } + + p.updateMetaData(sr) + + if err := p.updateChunkInfo(sr); err != nil { + p.log.Error(err) + runCallback(sr.callback, nil, msg, err) + return + } + p.dataChan <- sr if !p.options.DisableBlockIfQueueFull { @@ -1435,20 +1482,31 @@ func (p *partitionProducer) Close() { } type sendRequest struct { - ctx context.Context - msg *ProducerMessage - callback func(MessageID, *ProducerMessage, error) - callbackOnce *sync.Once - publishTime time.Time - flushImmediately bool - blockCh chan struct{} - closeBlockChOnce *sync.Once - totalChunks int - chunkID int - uuid string - chunkRecorder *chunkRecorder - transaction *transaction - reservedMem int64 + ctx context.Context + msg *ProducerMessage + callback func(MessageID, *ProducerMessage, error) + callbackOnce *sync.Once + publishTime time.Time + flushImmediately bool + blockCh chan struct{} + closeBlockChOnce *sync.Once + totalChunks int + chunkID int + uuid string + chunkRecorder *chunkRecorder + transaction *transaction + reservedMem int64 + sendAsBatch bool + schema Schema + schemaVersion []byte + uncompressedPayload []byte + uncompressedSize int64 + compressedPayload []byte + compressedSize int + payloadChunkSize int + mm *pb.MessageMetadata + deliverAt time.Time + maxMessageSize int32 } // stopBlock can be invoked multiple times safety