Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun committed Oct 24, 2023
1 parent 1f62b38 commit dea241a
Showing 1 changed file with 2 additions and 9 deletions.
11 changes: 2 additions & 9 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1140,12 +1140,6 @@ func (p *partitionProducer) prepareTransaction(sr *sendRequest) error {
}

sr.transaction = txn
callback := sr.callback
sr.callback = func(id MessageID, producerMessage *ProducerMessage, err error) {
runCallback(callback, id, producerMessage, err)
txn.endSendOrAckOp(err)
}

return nil
}

Expand Down Expand Up @@ -1177,13 +1171,12 @@ func (p *partitionProducer) internalSendAsync(
closeBlockChOnce: &sync.Once{},
}
if err := p.prepareTransaction(sr); err != nil {
runCallback(sr.callback, nil, msg, err)
sr.done(nil, err)
return
}

if p.getProducerState() != producerReady {
// Producer is closing
runCallback(sr.callback, nil, msg, errProducerClosed)
sr.done(nil, errProducerClosed)
return
}

Expand Down

0 comments on commit dea241a

Please sign in to comment.