Skip to content

[Bug] [Go agent] AMQP agent consumers are not living up to expectations  #12868

@renyansongno1

Description

@renyansongno1

Search before asking

  • I had searched in the issues and found no similar issues.

Apache SkyWalking Component

Go Agent (apache/skywalking-go)

What happened

Background

Recently, when using the agent of skywalking-go, we found that the consumer side of AMQP would block the goroutine. Through pprof's goroutines, it was discovered that the blockage occurred at the first line of the following function:

func GeneralConsumerAfterInvoke(invocation operator.Invocation, queue, consumerTag string, args amqp091.Table, results ...interface{}) error {
	deliveries := <-results[0].(<-chan Delivery)
	//....
}

Then, through the interception point defined by the plugin, we found that there are certain issues with the current enhancement logic. For example, the normal consumption code snippet is extracted as follows:

deliveries, err := c.channel.Consume(
		queue.Name, // name
		"",         // consumerTag,
		*autoAck,   // autoAck
		false,      // exclusive
		false,      // noLocal
		false,      // noWait
		nil,        // arguments
	)
	
	go handler(deliveries, err)

// goroutine consumer logic
func handle(deliveries <-chan amqp.Delivery, done chan error) {
	cleanup := func() {
		Log.Printf("handle: deliveries channel closed")
		done <- nil
	}

	defer cleanup()

	for d := range deliveries {
		deliveryCount++
		if *verbose == true {
			Log.Printf(
				"got %dB delivery: [%v] %q",
				len(d.Body),
				d.DeliveryTag,
				d.Body,
			)
		} else {
			if deliveryCount%65536 == 0 {
				Log.Printf("delivery count %d", deliveryCount)
			}
		}
		if *autoAck == false {
			d.Ack(false)
		}
	}
}

Therefore, the normal consumer needs to read the read-only chan returned by the consumer function. Any reading will cause data loss, and when there is no message coming, the current goroutine will also block here.

Additionally, this Consumer function is only called once, so the trace reporting of the messages only happens once, not for every message as a separate span.

What you expected to happen

In expectation, each message should correspond to a single span, and the agent should not affect any normal business logic.

How to reproduce

ampq-skywalking.zip

I provided a demo, which is an official example of AMQP091. Normally, using go build -toolexec will result in an enhanced binary file. If any message is sent to the broker and queue in the example, the issue can be reproduced (using the RabbitMQ management console or a producer to send messages).

Anything else

No response

Are you willing to submit a pull request to fix on your own?

  • Yes I am willing to submit a pull request on my own!

Code of Conduct

Metadata

Metadata

Assignees

Labels

agentLanguage agent related.bugSomething isn't working and you are sure it's a bug!goGo agent

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions