Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Why QoS level doesn't take in account the Kafka producer error? #125

Open
Grabber opened this issue Aug 5, 2021 · 0 comments
Open

Why QoS level doesn't take in account the Kafka producer error? #125

Grabber opened this issue Aug 5, 2021 · 0 comments

Comments

@Grabber
Copy link

Grabber commented Aug 5, 2021

I was reading the processClientPublish... why the QoS response don't take in account the return error from c.broker.Publish call?

If the broker fails to persist to Kafka, isn't it interesting to report back the failure to the publisher to let it retry until it's persisted to Kafka? Here I'm thinking about guaranteeing the message from publisher is persisted, not taking in account the extra bandwidth needed to retransmit until it is persisted.

It's almost like a QoS level "3" or "4": at least once or exactly once, with non-volatile persistence guarantee

To guarantee an extra level of deliverability, as the current implementation is using AsyncProducer it would be interesting to move the switch packet.QoS { ... } code block to Successes() or Errors() channels from AsyncProducer => https://github.com/shopify/sarama/blob/v1.20.1/async_producer.go#L19

func (c *client) processClientPublish(packet *packets.PublishPacket) {

	topic := packet.TopicName

	if !c.broker.CheckTopicAuth(PUB, c.info.clientID, c.info.username, c.info.remoteIP, topic) {
		log.Error("Pub Topics Auth failed, ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID))
		return
	}

	//publish kafka
	c.broker.Publish(&bridge.Elements{
		ClientID:  c.info.clientID,
		Username:  c.info.username,
		Action:    bridge.Publish,
		Timestamp: time.Now().Unix(),
		Payload:   string(packet.Payload),
		Topic:     topic,
	})

	switch packet.Qos {
	case QosAtMostOnce:
		c.ProcessPublishMessage(packet)
	case QosAtLeastOnce:
		puback := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
		puback.MessageID = packet.MessageID
		if err := c.WriterPacket(puback); err != nil {
			log.Error("send puback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
			return
		}
		c.ProcessPublishMessage(packet)
	case QosExactlyOnce:
		if err := c.registerPublishPacketId(packet.MessageID); err != nil {
			return
		} else {
			pubrec := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket)
			pubrec.MessageID = packet.MessageID
			if err := c.WriterPacket(pubrec); err != nil {
				log.Error("send pubrec error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
				return
			}
			c.ProcessPublishMessage(packet)
		}
		return
	default:
		log.Error("publish with unknown qos", zap.String("ClientID", c.info.clientID))
		return
	}

}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant