Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(iot-dev): Add more verbose logging around link credit in amqp links #1435

Merged
merged 8 commits into from Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -4,6 +4,7 @@
package com.microsoft.azure.sdk.iot.device.transport.amqps;

import com.microsoft.azure.sdk.iot.deps.transport.amqp.AmqpsMessage;
import com.microsoft.azure.sdk.iot.device.DeviceClientConfig;
import com.microsoft.azure.sdk.iot.device.Message;
import com.microsoft.azure.sdk.iot.device.MessageProperty;
import com.microsoft.azure.sdk.iot.device.MessageType;
Expand All @@ -22,6 +23,7 @@

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -58,20 +60,33 @@ public abstract class AmqpsReceiverLinkHandler extends BaseHandler
BaseHandler.setHandler(receiver, this);

//This flow controller handles all link credit handling on our behalf
add(new FlowController());
add(new LoggingFlowController(this.linkCorrelationId));
}

@Override
public void onLinkRemoteOpen(Event event)
{
log.debug("{} receiver link with link correlation id {} was successfully opened", getLinkInstanceType(), this.linkCorrelationId);
log.debug("{} receiver link with address {} and link correlation id {} was successfully opened", getLinkInstanceType(), this.receiverLinkAddress, this.linkCorrelationId);
this.amqpsLinkStateCallback.onLinkOpened(this);

boolean hasFlowController = false;
Iterator<Handler> children = children();
while (children.hasNext())
{
hasFlowController |= children.next() instanceof LoggingFlowController;
}

if (!hasFlowController)
{
log.trace("No flow controller detected in {} link with address {} and link correlation id {}. Adding a new flow controller.", getLinkInstanceType(), this.receiverLinkAddress, this.linkCorrelationId);
add(new LoggingFlowController(this.linkCorrelationId));
}
}

@Override
public void onLinkLocalOpen(Event event)
{
log.trace("{} receiver link with link correlation id {} opened locally", getLinkInstanceType(), this.linkCorrelationId);
log.trace("{} receiver link with address {} and link correlation id {} opened locally", getLinkInstanceType(), this.receiverLinkAddress, this.linkCorrelationId);
}

@Override
Expand All @@ -92,6 +107,8 @@ public void onDelivery(Event event)
IotHubTransportMessage iotHubMessage = this.protonMessageToIoTHubMessage(amqpsMessage);
this.receivedMessagesMap.put(iotHubMessage, amqpsMessage);
this.amqpsLinkStateCallback.onMessageReceived(iotHubMessage);

log.trace("Current link credit on {} receiver link with address {} and link correlation id {} is {}", this.getLinkInstanceType(), this.receiverLinkAddress, this.linkCorrelationId, receiverLink.getCredit());
}

@Override
Expand All @@ -109,7 +126,7 @@ public void onLinkInit(Event event)
link.setReceiverSettleMode(ReceiverSettleMode.FIRST);
link.setProperties(this.amqpProperties);
link.open();
log.trace("Opening {} receiver link with correlation id {}", this.getLinkInstanceType(), this.linkCorrelationId);
log.trace("Opening {} receiver link with address {} and link correlation id {}", this.getLinkInstanceType(), this.receiverLinkAddress, this.linkCorrelationId);
}

@Override
Expand All @@ -118,13 +135,13 @@ public void onLinkRemoteClose(Event event)
Link link = event.getLink();
if (link.getLocalState() == EndpointState.ACTIVE)
{
log.debug("{} receiver link with link correlation id {} was closed remotely unexpectedly", getLinkInstanceType(), this.linkCorrelationId);
log.debug("{} receiver link with address {} and link correlation id {} was closed remotely unexpectedly", getLinkInstanceType(), this.receiverLinkAddress, this.linkCorrelationId);
link.close();
this.amqpsLinkStateCallback.onLinkClosedUnexpectedly(link.getRemoteCondition());
}
else
{
log.trace("Closing amqp session now that this {} receiver link with link correlation id {} has closed remotely and locally", getLinkInstanceType(), linkCorrelationId);
log.trace("Closing amqp session now that {} receiver link with address {} and link correlation id {} has closed remotely and locally", getLinkInstanceType(), this.receiverLinkAddress, linkCorrelationId);
event.getSession().close();
}
}
Expand All @@ -135,12 +152,12 @@ public void onLinkLocalClose(Event event)
Link link = event.getLink();
if (link.getRemoteState() == EndpointState.CLOSED)
{
log.trace("Closing amqp session now that this {} receiver link with link correlation id {} has closed remotely and locally", getLinkInstanceType(), linkCorrelationId);
log.trace("Closing amqp session now that {} receiver link with address {} and link correlation id {} has closed remotely and locally", getLinkInstanceType(), this.receiverLinkAddress, linkCorrelationId);
event.getSession().close();
}
else
{
log.trace("{} receiver link with correlation id {} was closed locally", this.getLinkInstanceType(), this.linkCorrelationId);
log.trace("{} receiver link with address {} and link correlation id {} was closed locally", this.getLinkInstanceType(), this.receiverLinkAddress, this.linkCorrelationId);
}
}

Expand All @@ -161,13 +178,18 @@ AmqpsMessage getMessageFromReceiverLink(Receiver receiver)
{
Delivery delivery = receiver.current();

if ((delivery != null) && delivery.isReadable() && !delivery.isPartial())
if (delivery == null)
{
return null;
}

if (delivery.isReadable() && !delivery.isPartial())
{
int size = delivery.pending();
byte[] buffer = new byte[size];
int bytesRead = receiver.recv(buffer, 0, buffer.length);

log.trace("read {} bytes from receiver link {}", bytesRead, receiver.getName());
log.trace("read {} bytes from {} receiver link with address {} and link correlation id {}", bytesRead, getLinkInstanceType(), this.receiverLinkAddress, this.linkCorrelationId);

boolean receiverLinkAdvanced = receiver.advance();

Expand All @@ -178,7 +200,7 @@ AmqpsMessage getMessageFromReceiverLink(Receiver receiver)

if (size != bytesRead)
{
log.warn("Amqp read from {} receiver link with link correlation id {} did not read the expected amount of bytes. Read {} but expected {}", getLinkInstanceType(), this.linkCorrelationId, bytesRead, size);
log.warn("Amqp read operation on {} receiver link with link correlation id {} did not read the expected amount of bytes. Read {} but expected {}", getLinkInstanceType(), this.linkCorrelationId, bytesRead, size);
}

AmqpsMessage amqpsMessage = new AmqpsMessage();
Expand All @@ -188,12 +210,22 @@ AmqpsMessage getMessageFromReceiverLink(Receiver receiver)
return amqpsMessage;
}

if (delivery.isPartial())
{
log.trace("Partial delivery received on {} receiver link with address {} and link correlation id {}.", getLinkInstanceType(), this.receiverLinkAddress, this.linkCorrelationId);
}
else
{
// not partial, but not readable either
log.warn("Unreadable delivery received on {} receiver link with address {} and link correlation id {}.", getLinkInstanceType(), this.receiverLinkAddress, this.linkCorrelationId);
}

return null;
}

IotHubTransportMessage protonMessageToIoTHubMessage(AmqpsMessage protonMsg)
{
log.trace("Converting proton message to iot hub message for {} receiver link with link correlation id {}. Proton message correlation id {}", getLinkInstanceType(), this.linkCorrelationId, protonMsg.getCorrelationId());
log.trace("Converting proton message to iot hub message for {} receiver link with address {} and link correlation id {}. Proton message correlation id {}", getLinkInstanceType(), this.receiverLinkAddress, this.linkCorrelationId, protonMsg.getCorrelationId());
byte[] msgBody;
Data d = (Data) protonMsg.getBody();
if (d != null)
Expand Down Expand Up @@ -272,7 +304,7 @@ void close()
{
if (this.receiverLink.getLocalState() != EndpointState.CLOSED)
{
log.debug("Closing {} receiver link with link correlation id {}", getLinkInstanceType(), this.linkCorrelationId);
log.debug("Closing {} receiver link with address {} and link correlation id {}", getLinkInstanceType(), this.receiverLinkAddress, this.linkCorrelationId);
this.receiverLink.close();
}
}
Expand Down
Expand Up @@ -11,15 +11,26 @@
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.*;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.apache.qpid.proton.reactor.FlowController;

import java.nio.BufferOverflowException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -53,24 +64,34 @@ public abstract class AmqpsSenderLinkHandler extends BaseHandler

//All events that happen to this sender link will be handled in this class (onLinkRemoteOpen, for instance)
BaseHandler.setHandler(sender, this);

//This flow controller handles all link credit handling on our behalf
add(new FlowController());
}

protected abstract String getLinkInstanceType();

@Override
public void onLinkRemoteOpen(Event event)
{
log.debug("{} sender link with link correlation id {} was successfully opened", getLinkInstanceType(), this.linkCorrelationId);
log.debug("{} sender link with address {} and link correlation id {} was successfully opened", getLinkInstanceType(), this.senderLinkAddress, this.linkCorrelationId);
this.amqpsLinkStateCallback.onLinkOpened(this);

boolean hasFlowController = false;
Iterator<Handler> children = children();
while (children.hasNext())
{
hasFlowController |= children.next() instanceof LoggingFlowController;
}

if (!hasFlowController)
{
log.warn("No flow controller detected in {} link with address {} and link correlation id {}. Adding a new flow controller.", getLinkInstanceType(), this.senderLinkAddress, this.linkCorrelationId);
add(new LoggingFlowController(this.linkCorrelationId));
}
}

@Override
public void onLinkLocalOpen(Event event)
{
log.trace("{} sender link with link correlation id {} opened locally", getLinkInstanceType(), this.linkCorrelationId);
log.trace("{} sender link with address {} and link correlation id {} opened locally", getLinkInstanceType(), this.senderLinkAddress, this.linkCorrelationId);
}

@Override
Expand All @@ -84,7 +105,7 @@ public void onDelivery(Event event)
Message acknowledgedIotHubMessage = this.inProgressMessages.remove(deliveryTag);
if (acknowledgedIotHubMessage == null)
{
log.warn("Received acknowledgement for a message with delivery tag {} that this sender did not send", deliveryTag);
log.warn("Received acknowledgement for a message with delivery tag {} that this sender did not send on {} link with address {}", deliveryTag, getLinkInstanceType(), this.senderLinkAddress);
}
else
{
Expand All @@ -109,7 +130,7 @@ public void onLinkInit(Event event)
link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
link.setProperties(this.amqpProperties);
link.open();
log.trace("Opening {} sender link with correlation id {}", this.getLinkInstanceType(), this.linkCorrelationId);
log.trace("Opening {} sender link with address {} and link correlation id {}", this.getLinkInstanceType(), this.senderLinkAddress, this.linkCorrelationId);
}

@Override
Expand All @@ -118,13 +139,13 @@ public void onLinkRemoteClose(Event event)
Link link = event.getLink();
if (link.getLocalState() == EndpointState.ACTIVE)
{
log.debug("{} sender link with link correlation id {} was closed remotely unexpectedly", getLinkInstanceType(), this.linkCorrelationId);
log.debug("{} sender link with address {} and link correlation id {} was closed remotely unexpectedly", getLinkInstanceType(), this.senderLinkAddress, this.linkCorrelationId);
link.close();
this.amqpsLinkStateCallback.onLinkClosedUnexpectedly(link.getRemoteCondition());
}
else
{
log.trace("Closing amqp session now that this {} sender link with link correlation id {} has closed remotely and locally", getLinkInstanceType(), linkCorrelationId);
log.trace("Closing amqp session now that its {} sender link with address {} and link correlation id {} has closed remotely and locally", getLinkInstanceType(), this.senderLinkAddress, linkCorrelationId);
event.getSession().close();
}
}
Expand All @@ -135,20 +156,26 @@ public void onLinkLocalClose(Event event)
Link link = event.getLink();
if (link.getRemoteState() == EndpointState.CLOSED)
{
log.trace("Closing amqp session now that this {} sender link with link correlation id {} has closed remotely and locally", getLinkInstanceType(), linkCorrelationId);
log.trace("Closing amqp session now that its {} sender link with address {} and link correlation id {} has closed remotely and locally", getLinkInstanceType(), this.senderLinkAddress, linkCorrelationId);
event.getSession().close();
}
else
{
log.trace("{} sender link with correlation id {} was closed locally", this.getLinkInstanceType(), this.linkCorrelationId);
log.trace("{} sender link with address {} and link correlation id {} was closed locally", this.getLinkInstanceType(), this.senderLinkAddress, this.linkCorrelationId);
}
}

@Override
public void onLinkFlow(Event event)
{
log.trace("Link flow received on {} sender link with address {} and link correlation id {}. Current link credit is now {}.", getLinkInstanceType(), this.senderLinkAddress, this.linkCorrelationId, event.getSender().getCredit());
}

void close()
{
if (this.senderLink.getLocalState() != EndpointState.CLOSED)
{
log.debug("Closing {} sender link with link correlation id {}", getLinkInstanceType(), this.linkCorrelationId);
log.debug("Closing {} sender link with address {} and link correlation id {}", getLinkInstanceType(), this.senderLinkAddress, this.linkCorrelationId);
this.senderLink.close();
}
}
Expand Down Expand Up @@ -197,9 +224,8 @@ AmqpsSendResult sendMessageAndGetDeliveryTag(MessageImpl protonMessage)
Delivery delivery = this.senderLink.delivery(deliveryTag);
try
{
log.trace("Sending {} bytes over the amqp {} sender link with link correlation id {}", length, getLinkInstanceType(), this.linkCorrelationId);
log.trace("Sending {} bytes over the amqp {} sender link with address {} and link correlation id {} with link credit", length, getLinkInstanceType(), this.senderLinkAddress, this.linkCorrelationId, this.senderLink.getCredit());
int bytesSent = this.senderLink.send(msgData, 0, length);
log.trace("{} bytes sent over the amqp {} sender link with link correlation id {}", bytesSent, getLinkInstanceType(), this.linkCorrelationId);

if (bytesSent != length)
{
Expand All @@ -213,12 +239,13 @@ AmqpsSendResult sendMessageAndGetDeliveryTag(MessageImpl protonMessage)
throw new ProtocolException(String.format("Failed to advance the senderLink after sending a message on %s sender link with link correlation id %s, retrying to send the message", getLinkInstanceType(), this.linkCorrelationId));
}

log.trace("Message was sent over {} sender link with delivery tag {} and hash {}", getLinkInstanceType(), new String(deliveryTag, StandardCharsets.UTF_8), delivery.hashCode());
log.trace("Message was sent over {} sender link with address {} and link correlation id {} with delivery tag {}", getLinkInstanceType(), this.senderLinkAddress, this.linkCorrelationId, new String(deliveryTag, StandardCharsets.UTF_8));
log.trace("Current link credit on {} sender link with address {} and link correlation id {} is {}", this.getLinkInstanceType(), this.senderLinkAddress, this.linkCorrelationId, senderLink.getCredit());
return new AmqpsSendResult(deliveryTag);
}
catch (Exception e)
{
log.warn("Encountered a problem while sending a message on {} sender link with link correlation id {}", getLinkInstanceType(), this.linkCorrelationId, e);
log.warn("Encountered a problem while sending a message on {} sender link with address {} and link correlation id {}", getLinkInstanceType(), this.senderLinkAddress, this.linkCorrelationId, e);
this.senderLink.advance();
delivery.free();
return new AmqpsSendResult();
Expand All @@ -227,7 +254,7 @@ AmqpsSendResult sendMessageAndGetDeliveryTag(MessageImpl protonMessage)

MessageImpl iotHubMessageToProtonMessage(Message message)
{
log.trace("Converting IoT Hub message to proton message for {} sender link with link correlation id {}. IoT Hub message correlationId {}", getLinkInstanceType(), this.linkCorrelationId, message.getCorrelationId());
log.trace("Converting IoT Hub message to proton message for {} sender link with address {} and link correlation id {}. IoT Hub message correlationId {}", getLinkInstanceType(), this.senderLinkAddress, this.linkCorrelationId, message.getCorrelationId());
MessageImpl outgoingMessage = (MessageImpl) Proton.message();

Properties properties = new Properties();
Expand Down