Skip to content

Commit

Permalink
refactor: prepare sendrequest and move to internalSendAsync
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun committed Oct 24, 2023
1 parent 1b135f4 commit 573448c
Show file tree
Hide file tree
Showing 2 changed files with 285 additions and 207 deletions.
26 changes: 23 additions & 3 deletions pulsar/message_chunking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,11 +552,12 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) {
msg := &ProducerMessage{
Payload: []byte(fmt.Sprintf("chunk-%s-%d|", uuid, chunkID)),
}
wholePayload := msg.Payload
producerImpl := p.(*producer).producers[0].(*partitionProducer)
mm := producerImpl.genMetadata(msg, len(msg.Payload), time.Now())
mm := producerImpl.genMetadata(msg, len(wholePayload), time.Now())
mm.Uuid = proto.String(uuid)
mm.NumChunksFromMsg = proto.Int32(int32(totalChunks))
mm.TotalChunkMsgSize = proto.Int32(int32(len(msg.Payload)))
mm.TotalChunkMsgSize = proto.Int32(int32(len(wholePayload)))
mm.ChunkId = proto.Int32(int32(chunkID))
producerImpl.updateMetadataSeqID(mm, msg)

Expand All @@ -568,7 +569,26 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) {
callback: func(id MessageID, producerMessage *ProducerMessage, err error) {
close(doneCh)
},
msg: msg,
ctx: context.Background(),
msg: msg,
flushImmediately: true,
totalChunks: totalChunks,
chunkID: chunkID,
uuid: uuid,
chunkRecorder: newChunkRecorder(),
transaction: nil,
reservedMem: 0,
sendAsBatch: false,
schema: nil,
schemaVersion: nil,
uncompressedPayload: wholePayload,
uncompressedSize: int64(len(wholePayload)),
compressedPayload: wholePayload,
compressedSize: len(wholePayload),
payloadChunkSize: internal.MaxMessageSize - proto.Size(mm),
mm: mm,
deliverAt: time.Now(),
maxMessageSize: internal.MaxMessageSize,
},
uint32(internal.MaxMessageSize),
)
Expand Down
Loading

0 comments on commit 573448c

Please sign in to comment.