Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

context deadline exceeded #193

Closed
nmathew opened this issue Nov 11, 2022 · 30 comments
Closed

context deadline exceeded #193

nmathew opened this issue Nov 11, 2022 · 30 comments

Comments

@nmathew
Copy link

nmathew commented Nov 11, 2022

Env:

  "qpid_dispatch_version": "1.19.0",
  "qpid_proton_version": "0.34.0"
   Kubernetes 

When we are running the sample program

package main

import (
        "context"
        "fmt"
        "log"
        "time"

        "github.com/Azure/go-amqp"
)

func main() {
        // Create client
        client, err := amqp.Dial("amqp://router.router.svc.cluster.local")
        if err != nil {
                log.Fatal("Dialing AMQP server:", err)
        }
        defer client.Close()

        // Open a session
        session, err := client.NewSession()
        if err != nil {
                log.Fatal("Creating AMQP session:", err)
        }

        ctx := context.Background()

        // Send a message
        {
                // Create a sender
                sender, err := session.NewSender(
                        amqp.LinkTargetAddress("/queue-name"),
                )
                if err != nil {
                        log.Fatal("Creating sender link:", err)
                }

                ctx, cancel := context.WithTimeout(ctx, 5*time.Second)

                // Send message
                err = sender.Send(ctx, amqp.NewMessage([]byte("Hello!")))
                if err != nil {
                        log.Fatal("Sending message:", err)
                }

                sender.Close(ctx)
                cancel()
        }

        // Continuously read messages
        {
                // Create a receiver
                receiver, err := session.NewReceiver(
                        amqp.LinkSourceAddress("/queue-name"),
                        amqp.LinkCredit(10),
                )
                if err != nil {
                        log.Fatal("Creating receiver link:", err)
                }
                defer func() {
                        ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
                        receiver.Close(ctx)
                        cancel()
                }()

                for {
                        // Receive next message
                        msg, err := receiver.Receive(ctx)
                        if err != nil {
                                log.Fatal("Reading message from AMQP:", err)
                        }

                        // Accept message
                        msg.Accept(context.Background())

                        fmt.Printf("Message received: %s\n", msg.GetData())
                }
        }
}

amqp://router.router.svc.cluster.local is the qpid kubernetes service with "internalTrafficPolicy: Local"

root@cp-baremetal-ptpstack-1:/go/src/github.com/redhat-cne/amqptest# ./amqp-test
2022/11/11 07:26:50 Sending message:awaiting send: context deadline exceeded
root@cp-baremetal-ptpstack-1:/go/src/github.com/redhat-cne/amqptest#

Here test program is running in a pod and qpid-dispatcher router also running in another pod in same node.

test amqp-client/server will be directly connected to qpid router.

Router configuration is

router {
        mode: standalone
        id: router
    }

    listener {
        host: 0.0.0.0
        port: 5672
        role: normal
    }

    address {
        prefix: closest
        distribution: closest
    }

    address {
        prefix: multicast
        distribution: multicast
    }

    address {
        prefix: unicast
        distribution: closest
    }

    address {
        prefix: exclusive
        distribution: closest
    }

    address {
        prefix: broadcast
        distribution: multicast
    }
    address {
        prefix: cluster/node
        distribution: multicast
    }

through another program I can able to see the data reaching the other side, bi-directional data exchange is happening, but send() returns context deadline exceeded. Any idea why this happens?

I tried with newer versions of go-amqp, but same error.

@nmathew
Copy link
Author

nmathew commented Nov 11, 2022

https://qpid.apache.org/releases/qpid-dispatch-1.19.0/user-guide/index.html#sending-test-messages-qdr

python simple_recv.py -a 127.0.0.1:5672/examples -m 5
python simple_send.py -a 127.0.0.1:5672/examples -m 5

these simple python programs works with same router configuration.

@nmathew
Copy link
Author

nmathew commented Nov 11, 2022

When we are separating the above go program to sender and receiver, it works fine.

@nmathew
Copy link
Author

nmathew commented Nov 11, 2022

@jhendrixMSFT Any pointers?

@jhendrixMSFT
Copy link
Member

Your program looks fine, i.e. nothing obvious as to why Send() would time out. When you say "separating the above go program..." does that mean that the calls to Send() and Receive() are in separate programs? Assuming that's the case, I don't see why this would make a difference. Is the problem consistent? If so, you could try enabling debug logging and post the output. To do that, you need to build with the tag debug set, and set env var DEBUG_LEVEL=3

@nmathew
Copy link
Author

nmathew commented Nov 15, 2022

@jhendrixMSFT
router-n2r5m:/amqp # ./amqp-test
04:45:05.013553 TX: Open{ContainerID : cmUAYUQtj7AdTKLtnW_zl7SGd_UYiCa5NiGmUWsFB9XW42HuIWGDqQ, Hostname: 127.0.0.1, MaxFrameSize: 65536, ChannelMax: 65535, IdleTimeout: 1m0s, OutgoingLocales: [], IncomingLocales: [], OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
04:45:05.014178 RX: Open{ContainerID : Standalone_YbTF78l7Zx7_BBw, Hostname: , MaxFrameSize: 16384, ChannelMax: 32767, IdleTimeout: 8s, OutgoingLocales: [], IncomingLocales: [], OfferedCapabilities: [ANONYMOUS-RELAY qd.streaming-links], DesiredCapabilities: [ANONYMOUS-RELAY qd.streaming-links], Properties: map[product:qpid-dispatch-router qd.conn-id:66 version:1.19.0]}
04:45:05.014367 TX: Begin{RemoteChannel: , NextOutgoingID: 0, IncomingWindow: 1000, OutgoingWindow: 1000, HandleMax: 4294967295, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
04:45:05.014637 RX: Begin{RemoteChannel: 0, NextOutgoingID: 0, IncomingWindow: 2147483647, OutgoingWindow: 2147483647, HandleMax: 4294967295, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
04:45:05.014867 TX: Attach{Name: mXHcoiEWlHElRtl4eQcc4FFk4ZmFQz1VC6j3UDLogD6eewqckN8wNA, Handle: 0, Role: Sender, SenderSettleMode: , ReceiverSettleMode: , Source: , Target: source{Address: /examples, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
04:45:05.015570 RX(Session): Attach{Name: mXHcoiEWlHElRtl4eQcc4FFk4ZmFQz1VC6j3UDLogD6eewqckN8wNA, Handle: 0, Role: Receiver, SenderSettleMode: mixed, ReceiverSettleMode: first, Source: source{Address: , Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: Outcomes: [], Capabilities: []}, Target: source{Address: /examples, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
04:45:05.015597 RX: Attach{Name: mXHcoiEWlHElRtl4eQcc4FFk4ZmFQz1VC6j3UDLogD6eewqckN8wNA, Handle: 0, Role: Receiver, SenderSettleMode: mixed, ReceiverSettleMode: first, Source: source{Address: , Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: Outcomes: [], Capabilities: []}, Target: source{Address: /examples, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
2022/11/15 04:45:10 Sending message:awaiting send: context deadline exceeded
router-n2r5m:/amqp #

@nmathew
Copy link
Author

nmathew commented Nov 15, 2022

Separated sender and receiver logs
router-n2r5m:/amqp # ./amqp-recv
04:51:30.571257 TX: Open{ContainerID : eTQM7SQUWIa2KH9lOOwNPVHxSMwOXQk9SJ-txg-sZVM3BNFTZq_heQ, Hostname: 127.0.0.1, MaxFrameSize: 65536, ChannelMax: 65535, IdleTimeout: 1m0s, OutgoingLocales: [], IncomingLocales: [], OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
04:51:30.572158 RX: Open{ContainerID : Standalone_YbTF78l7Zx7_BBw, Hostname: , MaxFrameSize: 16384, ChannelMax: 32767, IdleTimeout: 8s, OutgoingLocales: [], IncomingLocales: [], OfferedCapabilities: [ANONYMOUS-RELAY qd.streaming-links], DesiredCapabilities: [ANONYMOUS-RELAY qd.streaming-links], Properties: map[product:qpid-dispatch-router qd.conn-id:81 version:1.19.0]}
04:51:30.572283 TX: Begin{RemoteChannel: , NextOutgoingID: 0, IncomingWindow: 1000, OutgoingWindow: 1000, HandleMax: 4294967295, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
04:51:30.572963 RX: Begin{RemoteChannel: 0, NextOutgoingID: 0, IncomingWindow: 2147483647, OutgoingWindow: 2147483647, HandleMax: 4294967295, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
04:51:30.573025 TX: Attach{Name: obBUSXuhjiRZBJktHsfllFx4n9iDP3LvOB1ePM6K-ealEBJ-mI2yxQ, Handle: 0, Role: Receiver, SenderSettleMode: , ReceiverSettleMode: , Source: source{Address: /examples, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: Outcomes: [], Capabilities: []}, Target: , Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
04:51:30.574193 RX(Session): Attach{Name: obBUSXuhjiRZBJktHsfllFx4n9iDP3LvOB1ePM6K-ealEBJ-mI2yxQ, Handle: 0, Role: Sender, SenderSettleMode: mixed, ReceiverSettleMode: first, Source: source{Address: /examples, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: Outcomes: [], Capabilities: []}, Target: source{Address: , Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
04:51:30.574215 RX: Attach{Name: obBUSXuhjiRZBJktHsfllFx4n9iDP3LvOB1ePM6K-ealEBJ-mI2yxQ, Handle: 0, Role: Sender, SenderSettleMode: mixed, ReceiverSettleMode: first, Source: source{Address: /examples, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: Outcomes: [], Capabilities: []}, Target: source{Address: , Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
04:51:30.574257 FLOW Link Mux half: source: /examples, inflight: 0, credit: 0, deliveryCount: 0, messages: 0, unsettled: 0, maxCredit : 10, settleMode: first
04:51:30.574262 link.muxFlow(): len(l.messages):0 - linkCredit: 10 - deliveryCount: 0, inFlight: 0
04:51:30.574284 TX: Flow{NextIncomingID: , IncomingWindow: 0, NextOutgoingID: 0, OutgoingWindow: 0, Handle: 0, DeliveryCount: 0, LinkCredit: 10, Available: , Drain: false, Echo: false, Properties: map[]}
04:51:30.574295 TX(Session) - tx: Flow{NextIncomingID: 0, IncomingWindow: 1000, NextOutgoingID: 0, OutgoingWindow: 1000, Handle: 0, DeliveryCount: 0, LinkCredit: 10, Available: , Drain: false, Echo: false, Properties: map[]}
04:51:36.135943 RX(Session): Transfer{Handle: 0, DeliveryID: 0, DeliveryTag: "\xd9P\x00\x00\x00\x00\x00\x00", MessageFormat: 0, Settled: false, More: false, ReceiverSettleMode: , State: , Resume: false, Aborted: false, Batchable: false, Payload [size]: 11}
04:51:36.135961 TX(Session) Flow? remoteOutgoingWindow(2147483646) < s.incomingWindow(1000)/2
04:51:36.135970 RX: Transfer{Handle: 0, DeliveryID: 0, DeliveryTag: "\xd9P\x00\x00\x00\x00\x00\x00", MessageFormat: 0, Settled: false, More: false, ReceiverSettleMode: , State: , Resume: false, Aborted: false, Batchable: false, Payload [size]: 11}
04:51:36.135988 deliveryID 0 before push to receiver - deliveryCount : 0 - linkCredit: 10, len(messages): 0, len(inflight): 0
04:51:36.135994 deliveryID 0 after push to receiver - deliveryCount : 0 - linkCredit: 10, len(messages): 0, len(inflight): 0
04:51:36.135999 deliveryID 0 before exit - deliveryCount : 1 - linkCredit: 9, len(messages): 0
04:51:36.136005 Receive() blocking 0
04:51:36.136018 TX: Disposition{Role: Receiver, First: 0, Last: , Settled: true, State: Accepted, Batchable: false}
Message received: Hello!
^C
router-n2r5m:/amqp #

router-n2r5m:/amqp # ./amqp-send
04:51:36.133327 TX: Open{ContainerID : 7eJ3MPlnHEcRNuHq0mXAeJznyYUesDFGMiersjr_i3q3ZJrgcCgPww, Hostname: 127.0.0.1, MaxFrameSize: 65536, ChannelMax: 65535, IdleTimeout: 1m0s, OutgoingLocales: [], IncomingLocales: [], OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
04:51:36.133875 RX: Open{ContainerID : Standalone_YbTF78l7Zx7_BBw, Hostname: , MaxFrameSize: 16384, ChannelMax: 32767, IdleTimeout: 8s, OutgoingLocales: [], IncomingLocales: [], OfferedCapabilities: [ANONYMOUS-RELAY qd.streaming-links], DesiredCapabilities: [ANONYMOUS-RELAY qd.streaming-links], Properties: map[product:qpid-dispatch-router qd.conn-id:82 version:1.19.0]}
04:51:36.134004 TX: Begin{RemoteChannel: , NextOutgoingID: 0, IncomingWindow: 1000, OutgoingWindow: 1000, HandleMax: 4294967295, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
04:51:36.134523 RX: Begin{RemoteChannel: 0, NextOutgoingID: 0, IncomingWindow: 2147483647, OutgoingWindow: 2147483647, HandleMax: 4294967295, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
04:51:36.134626 TX: Attach{Name: tN9-2-wBT7hT0OAIe0fU7tg96V2l1JtjBfhbyEABIo_mk5vO-3d_KQ, Handle: 0, Role: Sender, SenderSettleMode: , ReceiverSettleMode: , Source: , Target: source{Address: /examples, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
04:51:36.135257 RX(Session): Attach{Name: tN9-2-wBT7hT0OAIe0fU7tg96V2l1JtjBfhbyEABIo_mk5vO-3d_KQ, Handle: 0, Role: Receiver, SenderSettleMode: mixed, ReceiverSettleMode: first, Source: source{Address: , Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: Outcomes: [], Capabilities: []}, Target: source{Address: /examples, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
04:51:36.135280 RX(Session): Flow{NextIncomingID: 0, IncomingWindow: 2147483647, NextOutgoingID: 0, OutgoingWindow: 2147483647, Handle: 0, DeliveryCount: 0, LinkCredit: 250, Available: , Drain: false, Echo: false, Properties: map[]}
04:51:36.135284 RX(Session) Flow - remoteOutgoingWindow: 2147483647 remoteIncomingWindow: 2147483647 nextOutgoingID: 0
04:51:36.135297 RX: Attach{Name: tN9-2-wBT7hT0OAIe0fU7tg96V2l1JtjBfhbyEABIo_mk5vO-3d_KQ, Handle: 0, Role: Receiver, SenderSettleMode: mixed, ReceiverSettleMode: first, Source: source{Address: , Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: Outcomes: [], Capabilities: []}, Target: source{Address: /examples, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
04:51:36.135387 RX: Flow{NextIncomingID: 0, IncomingWindow: 2147483647, NextOutgoingID: 0, OutgoingWindow: 2147483647, Handle: 0, DeliveryCount: 0, LinkCredit: 250, Available: , Drain: false, Echo: false, Properties: map[]}
04:51:36.135393 Link Mux isSender: credit: 250, deliveryCount: 0, messages: 0, unsettled: 0
04:51:36.135417 TX(link): Transfer{Handle: 0, DeliveryID: 1, DeliveryTag: "\x00\x00\x00\x00\x00\x00\x00\x00", MessageFormat: 0, Settled: false, More: false, ReceiverSettleMode: , State: , Resume: false, Aborted: false, Batchable: false, Payload [size]: 11}
04:51:36.135422 TX(link): key:tN9-2-wBT7hT0OAIe0fU7tg96V2l1JtjBfhbyEABIo_mk5vO-3d_KQ, decremented linkCredit: 249
04:51:36.135424 Link Mux isSender: credit: 249, deliveryCount: 1, messages: 0, unsettled: 0
04:51:36.135432 TX(Session) - txtransfer: Transfer{Handle: 0, DeliveryID: 1, DeliveryTag: "\x00\x00\x00\x00\x00\x00\x00\x00", MessageFormat: 0, Settled: false, More: false, ReceiverSettleMode: , State: , Resume: false, Aborted: false, Batchable: false, Payload [size]: 11}
04:51:36.136763 RX(Session): Flow{NextIncomingID: 1, IncomingWindow: 2147483647, NextOutgoingID: 0, OutgoingWindow: 2147483647, Handle: 0, DeliveryCount: 1, LinkCredit: 250, Available: , Drain: false, Echo: false, Properties: map[]}
04:51:36.136779 RX(Session) Flow - remoteOutgoingWindow: 2147483647 remoteIncomingWindow: 2147483647 nextOutgoingID: 1
04:51:36.136797 RX(Session): Disposition{Role: Receiver, First: 1, Last: , Settled: true, State: Accepted, Batchable: false}
04:51:36.136810 RX: Flow{NextIncomingID: 1, IncomingWindow: 2147483647, NextOutgoingID: 0, OutgoingWindow: 2147483647, Handle: 0, DeliveryCount: 1, LinkCredit: 250, Available: , Drain: false, Echo: false, Properties: map[]}
04:51:36.136820 Link Mux isSender: credit: 250, deliveryCount: 1, messages: 0, unsettled: 0
04:51:36.136832 TX(Session) - default: Detach{Handle: 0, Closed: true, Error: *Error(nil)}
04:51:36.137307 RX(Session): Detach{Handle: 0, Closed: true, Error: *Error(nil)}
04:51:36.137340 TX: Close{Error: *Error(nil)}
router-n2r5m:/amqp #

@nmathew
Copy link
Author

nmathew commented Nov 15, 2022

sender.go

package main

import (
        "context"
        "log"
        "time"

        "github.com/Azure/go-amqp"
)

func main() {
        // Create client
        client, err := amqp.Dial("amqp://127.0.0.1:5672")
        if err != nil {
                log.Fatal("Dialing AMQP server:", err)
        }
        defer client.Close()

        // Open a session
        session, err := client.NewSession()
        if err != nil {
                log.Fatal("Creating AMQP session:", err)
        }

        ctx := context.Background()

        // Send a message
        {
                // Create a sender
                sender, err := session.NewSender(
                        amqp.LinkTargetAddress("/examples"),
                )
                if err != nil {
                        log.Fatal("Creating sender link:", err)
                }

                ctx, cancel := context.WithTimeout(ctx, 5*time.Second)

                // Send message
                err = sender.Send(ctx, amqp.NewMessage([]byte("Hello!")))
                if err != nil {
                        log.Fatal("Sending message:", err)
                }

                sender.Close(ctx)
                cancel()
        }

}

recv.go

package main

import (
        "context"
        "fmt"
        "log"
        "time"

        "github.com/Azure/go-amqp"
)

func main() {
        // Create client
        client, err := amqp.Dial("amqp://127.0.0.1:5672")
        if err != nil {
                log.Fatal("Dialing AMQP server:", err)
        }
        defer client.Close()

        // Open a session
        session, err := client.NewSession()
        if err != nil {
                log.Fatal("Creating AMQP session:", err)
        }

        ctx := context.Background()


        // Continuously read messages
        {
                // Create a receiver
                receiver, err := session.NewReceiver(
                        amqp.LinkSourceAddress("/examples"),
                        amqp.LinkCredit(10),
                )
                if err != nil {
                        log.Fatal("Creating receiver link:", err)
                }
                defer func() {
                        ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
                        receiver.Close(ctx)
                        cancel()
                }()

                for {
                        // Receive next message
                        msg, err := receiver.Receive(ctx)
                        if err != nil {
                                log.Fatal("Reading message from AMQP:", err)
                        }

                        // Accept message
                        msg.Accept(context.Background())

                        fmt.Printf("Message received: %s\n", msg.GetData())
                }
        }
}

@jhendrixMSFT
Copy link
Member

Thanks for the info.

From the logs, I assume this is the separated sender/receiver which works fine, yes? I see that the sender sends the transfer frame and receives a disposition frame, indicating that the peer accepted the transfer and is now settled.

If you can consistently repro the failure case, can you please send the debug log for that? At this point I see no clear reason why the sender would time out.

@nmathew
Copy link
Author

nmathew commented Nov 15, 2022

@jhendrixMSFT see this comment
this is the log for single sender & receiver
#193 (comment).
Logs generated by following your instruction "build with the tag debug set, and set env var DEBUG_LEVEL=3"

@jhendrixMSFT
Copy link
Member

jhendrixMSFT commented Nov 15, 2022

Thanks, I completely overlooked that.

From that log, the sending link never received a flow frame from the peer during attach. The flow frame includes the available link credit of the peer, and until credit has been issued, the sender is not allowed to send any message. For whatever reason, the peer never sends a flow frame, so the sender's context times out (in the success case, you can see that the sender received a flow frame indicating that the peer has 250 link credit).

How can I install the necessary broker to repro this? I can't repro it using the .NET broker or ActiveMQ 5.16.3.

@nmathew
Copy link
Author

nmathew commented Nov 16, 2022

@jhendrixMSFT
Below is the logs from qpid-dispatch router(for single producer and consumer).

2022-11-16 04:19:42.169325 +0000 SERVER (info) [C5] Accepted connection to 0.0.0.0:amqp from 127.0.0.1:42088
2022-11-16 04:19:42.169716 +0000 ROUTER_CORE (info) [C5] Connection Opened: dir=in host=127.0.0.1:42088 vhost= encrypted=no auth=no user=anonymous container_id=Ku_ABnHSPonUkN_nTLjkzi7FzWVho9j7ih1my7YgTsqyb08aGvMPXw props=
2022-11-16 04:19:42.170512 +0000 ROUTER_CORE (info) [C5][L12] Link attached: dir=in source={ expire:sess} target={/queue-name expire:sess}
2022-11-16 04:19:47.172194 +0000 SERVER (info) [C5] Connection from 127.0.0.1:42088 (to 0.0.0.0:amqp) failed: amqp:connection:framing-error connection aborted
2022-11-16 04:19:47.172355 +0000 ROUTER_CORE (info) [C5][L12] Link lost: del=0 presett=0 psdrop=0 acc=0 rej=0 rel=0 mod=0 delay1=0 delay10=0 blocked=no
2022-11-16 04:19:47.172415 +0000 ROUTER_CORE (info) [C5] Connection Closed

@nmathew
Copy link
Author

nmathew commented Nov 16, 2022

To get the docker image
clone this https://github.com/apache/qpid-dispatch.git 1.19.0 release or latest
then do "docker build ."
that should generate the docker, which upon exec will run qdrouterd.

then we can put a go compiler, and test code inside docker.

@jhendrixMSFT
Copy link
Member

jhendrixMSFT commented Nov 16, 2022

According to the spec, a amqp:connection:framing-error indicates that "A valid frame header cannot be formed from the incoming byte stream."

Is there any way to dump the complete frame data, or maybe use some other tool to capture the network traffic?

One other thing to try first, can you add amqp.ConnSASLAnonymous() to your call to Dial()? I had to do this when connecting to the .NET/ActiveMQ broker else it would fail to connect (I don't know why it would work in one case and not the other though as it's the same code).

@jhendrixMSFT
Copy link
Member

Also, your use of qpid-dispatch makes me wonder if this is related to #135 which also reports problems when running under qpid-dispatch.

@nmathew
Copy link
Author

nmathew commented Nov 16, 2022

@jhendrixMSFT

        "github.com/Azure/go-amqp"
)

func main() {
        // Create client
        client, err := amqp.Dial("amqp://127.0.0.1:5672", amqp.ConnSASLAnonymous())
        if err != nil {
                log.Fatal("Dialing AMQP server:", err)
        }
        defer client.Close()

        // Open a session
        session, err := client.NewSession()
        if err != nil {
                log.Fatal("Creating AMQP session:", err)
        }

        ctx := context.Background()

logs :

router-n2r5m:/amqp #
router-n2r5m:/amqp #
router-n2r5m:/amqp #
router-n2r5m:/amqp # ./amqp-test
17:19:17.917626 RX: SaslMechanisms{Mechanisms : [ANONYMOUS]}
17:19:17.917692 TX: SaslInit{Mechanism : ANONYMOUS, InitialResponse: ********, Hostname: }
17:19:17.918093 RX: SaslOutcome{Code : 0, AdditionalData: []}
17:19:17.919023 TX: Open{ContainerID : xgaYCwIqhqHYwQqzMAHdmZx0ubTdEedlwXvyye1t3-JFQuQejeookA, Hostname: 127.0.0.1, MaxFrameSize: 65536, ChannelMax: 65535, IdleTimeout: 1m0s, OutgoingLocales: [], IncomingLocales: [], OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
17:19:17.920067 RX: Open{ContainerID : Standalone_YbTF78l7Zx7_BBw, Hostname: , MaxFrameSize: 16384, ChannelMax: 32767, IdleTimeout: 8s, OutgoingLocales: [], IncomingLocales: [], OfferedCapabilities: [ANONYMOUS-RELAY qd.streaming-links], DesiredCapabilities: [ANONYMOUS-RELAY qd.streaming-links], Properties: map[product:qpid-dispatch-router qd.conn-id:85 version:1.19.0]}
17:19:17.920184 TX: Begin{RemoteChannel: , NextOutgoingID: 0, IncomingWindow: 1000, OutgoingWindow: 1000, HandleMax: 4294967295, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
17:19:17.920440 RX: Begin{RemoteChannel: 0, NextOutgoingID: 0, IncomingWindow: 2147483647, OutgoingWindow: 2147483647, HandleMax: 4294967295, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
17:19:17.920544 TX: Attach{Name: FT9c0Xi77G5fj9oC5n0vGwwFkGldvtzMriwiT7Ty9tTq18_ikrJ2OA, Handle: 0, Role: Sender, SenderSettleMode: , ReceiverSettleMode: , Source: , Target: source{Address: /examples, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
17:19:17.921451 RX(Session): Attach{Name: FT9c0Xi77G5fj9oC5n0vGwwFkGldvtzMriwiT7Ty9tTq18_ikrJ2OA, Handle: 0, Role: Receiver, SenderSettleMode: mixed, ReceiverSettleMode: first, Source: source{Address: , Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: Outcomes: [], Capabilities: []}, Target: source{Address: /examples, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
17:19:17.921482 RX: Attach{Name: FT9c0Xi77G5fj9oC5n0vGwwFkGldvtzMriwiT7Ty9tTq18_ikrJ2OA, Handle: 0, Role: Receiver, SenderSettleMode: mixed, ReceiverSettleMode: first, Source: source{Address: , Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: Outcomes: [], Capabilities: []}, Target: source{Address: /examples, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
2022/11/16 17:19:22 Sending message:awaiting send: context deadline exceeded
router-n2r5m:/amqp #

@jhendrixMSFT
Copy link
Member

Thanks for trying, was skeptical it would help but was a simple test.

@nmathew
Copy link
Author

nmathew commented Nov 16, 2022

@jhendrixMSFT
Here is the pcap
https://www.dropbox.com/transfer/AAAAAKHWQQpW0ZnQ_q3d8Kytnhyx8zd1Btx_ZRS86k3MuI9MSkkUFHg

@jhendrixMSFT
Copy link
Member

Thanks for the trace.

I'm currently at a loss to explain the behavior. You can see the proper flow of AMQP frames, just like in the text logs. Four seconds later, an empty keep-alive frame is sent (as expected). At five seconds you can see the beginnings of closing the TCP connection, presumably due to the context timing out and the app exiting (I assume this is where the framing error is coming from, but not 100% sure yet).

@jhendrixMSFT
Copy link
Member

The Python example you cited runs the sender and receiver separately?

@nmathew
Copy link
Author

nmathew commented Nov 17, 2022

Yes those python examples are from qpid-proton and those run separately.
https://gitbox.apache.org/repos/asf?p=qpid-proton.git;a=tree;hb=HEAD

@jhendrixMSFT
Copy link
Member

jhendrixMSFT commented Nov 17, 2022

I've been rereading the investigation thus far and there are a few points that don't add up which makes me wonder if there's another piece to the puzzle that we're overlooking.

Earlier you mentioned that "through another program I can able to see the data reaching the other side" (I assume this is the "Hello!" message). When you reran the test with debug logging enabled (this one), did you still see data reaching the other side? If so then I can only guess that the data is coming from elsewhere as there was no transfer frame sent in that log, i.e. there would be an entry that contains "TX(link): Transfer".

@nmathew
Copy link
Author

nmathew commented Nov 17, 2022

the whole problem started from here redhat-cne/cloud-event-proxy#150
cloud-event-proxy uses go-amqp,
"failed to send(TO): /cluster/node/cp-baremetal-ptpstack-1/mock/RRyIz result context deadline exceeded "
these are two different executable talking to qpid-dipatcher.
From the logs I can see data reaching other side, but send(TO) (which internally calls amqp.send()) fails with context deadline.
Then on debugging I tried testing go-amqp with current sample program.

@jhendrixMSFT
Copy link
Member

In the sample program, it's just the one sender program talking to the qpid-dispatcher? What's on the receiving side of the dispatcher?

@nmathew
Copy link
Author

nmathew commented Nov 17, 2022

package main

import (
        "context"
        "fmt"
        "log"
        "time"

        "github.com/Azure/go-amqp"
)

func main() {
        // Create client
        client, err := amqp.Dial("amqp://127.0.0.1:5672")
        if err != nil {
                log.Fatal("Dialing AMQP server:", err)
        }
        defer client.Close()

        // Open a session
        session, err := client.NewSession()
        if err != nil {
                log.Fatal("Creating AMQP session:", err)
        }

        ctx := context.Background()

        // Send a message
        {
                // Create a sender
                sender, err := session.NewSender(
                        amqp.LinkTargetAddress("/queue-name"),
                )
                if err != nil {
                        log.Fatal("Creating sender link:", err)
                }

                ctx, cancel := context.WithTimeout(ctx, 5*time.Second)

                // Send message
                err = sender.Send(ctx, amqp.NewMessage([]byte("Hello!")))
                if err != nil {
                        log.Fatal("Sending message:", err)
                }

                sender.Close(ctx)
                cancel()
        }

        // Continuously read messages
        {
                // Create a receiver
                receiver, err := session.NewReceiver(
                        amqp.LinkSourceAddress("/queue-name"),
                        amqp.LinkCredit(10),
                )
                if err != nil {
                        log.Fatal("Creating receiver link:", err)
                }
                defer func() {
                        ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
                        receiver.Close(ctx)
                        cancel()
                }()

                for {
                        // Receive next message
                        msg, err := receiver.Receive(ctx)
                        if err != nil {
                                log.Fatal("Reading message from AMQP:", err)
                        }

                        // Accept message
                        msg.Accept(context.Background())

                        fmt.Printf("Message received: %s\n", msg.GetData())
                }
        }
}

This is the sample program, nothing else.

@nmathew
Copy link
Author

nmathew commented Nov 17, 2022

This example is direct copy of go-amqp example.

@nmathew
Copy link
Author

nmathew commented Nov 17, 2022

you should be able to reproduce the problem by creating qpid docker container which I mentioned earlier, and then running this sample program.

@nmathew
Copy link
Author

nmathew commented Nov 17, 2022

IS this sample program correct? sender is waiting in a single thread to to send, before the receiver who gives link credits.

I am able to reproduce the same problem with separate receiver and sender, by giving zero credits during attach.

@jhendrixMSFT
Copy link
Member

This SDK only supports the creation of AMQP clients. You can't send a message to "yourself" using this SDK. When you create a client, it must connect to a broker process (ActiveMQ, Azure Service Bus/Event Hubs, etc). Please see the conceptual model in the spec for more info on this.

@jhendrixMSFT
Copy link
Member

To clarify the above, if qpid-dispatcher removes the need for a dedicated broker (I skimmed some of the docs but know very little about it), then you are correct that you can't have the sender and receiver on the same goroutine.

@jhendrixMSFT
Copy link
Member

Closing as I believe we've found the solution. Please ping back if I'm mistaken.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants