Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug][Producer]Forget to consume the sendRequests before closing the producer #1042

Closed
gunli opened this issue Jun 28, 2023 · 0 comments · Fixed by #1059
Closed

[Bug][Producer]Forget to consume the sendRequests before closing the producer #1042

gunli opened this issue Jun 28, 2023 · 0 comments · Fixed by #1059

Comments

@gunli
Copy link
Contributor

gunli commented Jun 28, 2023

Expected behavior

Consume the sendRequests before closing the producer, send and flush them, invoke the callbacks of the input messages, so that the application can know the producing is succeed or failed.

Actual behavior

Currently, when we close the producer, we forget to consume the sendRequests in partitionProducer.dataChan before closing.
In the case that producing is faster than consuming in partitionProducer, we have send a lot of sendRequests into partitionProducer.dataChan, when closing, many of them are not consumed, these sendRequests will looks like get lost, their callback won't get invoked, the application won't know the producing result of these messages.

Steps to reproduce

Review the code of partitionProducer

func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
	callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
	//Register transaction operation to transaction and the transaction coordinator.
	var newCallback func(MessageID, *ProducerMessage, error)
	if msg.Transaction != nil {
		transactionImpl := (msg.Transaction).(*transaction)
		if transactionImpl.state != TxnOpen {
			p.log.WithField("state", transactionImpl.state).Error("Failed to send message" +
				" by a non-open transaction.")
			callback(nil, msg, newError(InvalidStatus, "Failed to send message by a non-open transaction."))
			return
		}

		if err := transactionImpl.registerProducerTopic(p.topic); err != nil {
			callback(nil, msg, err)
			return
		}
		if err := transactionImpl.registerSendOrAckOp(); err != nil {
			callback(nil, msg, err)
		}
		newCallback = func(id MessageID, producerMessage *ProducerMessage, err error) {
			callback(id, producerMessage, err)
			transactionImpl.endSendOrAckOp(err)
		}
	} else {
		newCallback = callback
	}
	if p.getProducerState() != producerReady {
		// Producer is closing
		newCallback(nil, msg, errProducerClosed)
		return
	}

	// bc only works when DisableBlockIfQueueFull is false
	bc := make(chan struct{})

	// callbackOnce make sure the callback is only invoked once in chunking
	callbackOnce := &sync.Once{}
	var txn *transaction
	if msg.Transaction != nil {
		txn = (msg.Transaction).(*transaction)
	}
	sr := &sendRequest{
		ctx:              ctx,
		msg:              msg,
		callback:         newCallback,
		callbackOnce:     callbackOnce,
		flushImmediately: flushImmediately,
		publishTime:      time.Now(),
		blockCh:          bc,
		closeBlockChOnce: &sync.Once{},
		transaction:      txn,
	}
	p.options.Interceptors.BeforeSend(p, msg)

	p.dataChan <- sr

	if !p.options.DisableBlockIfQueueFull {
		// block if queue full
		<-bc
	}
}

func (p *partitionProducer) internalClose(req *closeProducer) {
	defer close(req.doneCh)
	if !p.casProducerState(producerReady, producerClosing) {
		return
	}

	p.log.Info("Closing producer")

	id := p.client.rpcClient.NewRequestID()
	_, err := p.client.rpcClient.RequestOnCnx(p._getConn(), id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{
		ProducerId: &p.producerID,
		RequestId:  &id,
	})

	if err != nil {
		p.log.WithError(err).Warn("Failed to close producer")
	} else {
		p.log.Info("Closed producer")
	}

	if p.batchBuilder != nil {
		if err = p.batchBuilder.Close(); err != nil {
			p.log.WithError(err).Warn("Failed to close batch builder")
		}
	}

	p.setProducerState(producerClosed)
	p._getConn().UnregisterListener(p.producerID)
	p.batchFlushTicker.Stop()
}

And in the interface of the Producer, we saied Waits until all pending write request are persisted. In case of errors, pending writes will not be retried., I think the messages that have been summit to SendAsync should be treated as pending write requests, so we should send and flush them to make sure they are done with a result.

// Producer is used to publish messages on a topic
type Producer interface {
	...

	// Close the producer and releases resources allocated
	// No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
	// of errors, pending writes will not be retried.
	Close()
}

System configuration

Pulsar version: x.y

@zengguan @merlimat @wolfstudy

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
1 participant