Skip to content

Commit

Permalink
Improve response matching with CoapMessageIdentifier
Browse files Browse the repository at this point in the history
  • Loading branch information
NZSmartie committed Dec 19, 2017
1 parent 768f50f commit f3019ff
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 22 deletions.
29 changes: 15 additions & 14 deletions src/CoAPNet/CoapClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,8 @@ public void Dispose()
/// <param name="token">Token to cancel the blocking Receive operation</param>
/// <returns>Valid result if a result is received, <c>null</c> if canceled.</returns>
/// <exception cref="CoapClientException">If the timeout period * maximum retransmission attempts was reached.</exception>
public Task<CoapMessage> GetResponseAsync(CoapMessage request, ICoapEndpoint endpoint = null, CancellationToken token = default(CancellationToken), bool dequeue = true)
=> GetResponseAsyncInternal(request.GetIdentifier(endpoint), token, dequeue);
public Task<CoapMessage> GetResponseAsync(CoapMessage request, ICoapEndpoint endpoint = null, bool isRequest = false, CancellationToken token = default(CancellationToken), bool dequeue = true)
=> GetResponseAsync(request.GetIdentifier(endpoint, isRequest), token, dequeue);


/// <summary>
Expand All @@ -344,10 +344,11 @@ public void Dispose()
public Task<CoapMessage> GetResponseAsync(int messageId, CancellationToken token = default(CancellationToken), bool dequeue = true)
{
// Assume message was Confirmable
return GetResponseAsyncInternal(new CoapMessageIdentifier(messageId, CoapMessageType.Confirmable), token, dequeue);
return GetResponseAsync(new CoapMessageIdentifier(messageId, CoapMessageType.Confirmable), token, dequeue);
}

private async Task<CoapMessage> GetResponseAsyncInternal(CoapMessageIdentifier messageId, CancellationToken token = default(CancellationToken), bool dequeue = true)
// TODO: Ignore Acks, we're actually interested in a response.
public async Task<CoapMessage> GetResponseAsync(CoapMessageIdentifier messageId, CancellationToken token = default(CancellationToken), bool dequeue = true)
{
TaskCompletionSource<CoapMessage> responseTask = null;

Expand All @@ -356,7 +357,7 @@ public void Dispose()

if (responseTask.Task.IsCompleted)
return _recentMessages.FirstOrDefault(m => m.Item3.GetIdentifier() == messageId)?.Item3
?? throw new CoapClientException($"No recent message found for {messageId}. This may happen when {nameof(MessageCacheTimeSpan)} is set too short");
?? throw new CoapClientException($"No recent message found for {messageId}. This may happen when {nameof(MessageCacheTimeSpan)} is too short");

if (Endpoint == null)
throw new CoapEndpointException($"{nameof(CoapClient)} has an invalid {nameof(Endpoint)}");
Expand Down Expand Up @@ -389,7 +390,7 @@ public void Dispose()
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public virtual async Task<int> SendAsync(CoapMessage message)
public virtual async Task<CoapMessageIdentifier> SendAsync(CoapMessage message)
=> await SendAsync(message, null, CancellationToken.None);

/// <summary>
Expand All @@ -398,7 +399,7 @@ public virtual async Task<int> SendAsync(CoapMessage message)
/// <param name="message"></param>
/// <param name="token"></param>
/// <returns></returns>
public virtual async Task<int> SendAsync(CoapMessage message, CancellationToken token)
public virtual async Task<CoapMessageIdentifier> SendAsync(CoapMessage message, CancellationToken token)
=> await SendAsync(message, null, token);

/// <summary>
Expand All @@ -407,7 +408,7 @@ public virtual async Task<int> SendAsync(CoapMessage message, CancellationToken
/// <param name="message"></param>
/// <param name="endpoint"></param>
/// <returns></returns>
public virtual async Task<int> SendAsync(CoapMessage message, ICoapEndpoint endpoint)
public virtual async Task<CoapMessageIdentifier> SendAsync(CoapMessage message, ICoapEndpoint endpoint)
=> await SendAsync(message, endpoint, CancellationToken.None);

private int GetNextMessageId()
Expand All @@ -424,7 +425,7 @@ private int GetNextMessageId()
/// <param name="token">A token used to cancel the blocking Send operation or retransmission attempts.</param>
/// <returns>The message Id</returns>
/// <exception cref="CoapClientException">If the timeout period * maximum retransmission attempts was reached.</exception>
public virtual async Task<int> SendAsync(CoapMessage message, ICoapEndpoint endpoint, CancellationToken token)
public virtual async Task<CoapMessageIdentifier> SendAsync(CoapMessage message, ICoapEndpoint endpoint, CancellationToken token)
{
if (message == null)
throw new ArgumentNullException(nameof(message));
Expand All @@ -438,14 +439,14 @@ public virtual async Task<int> SendAsync(CoapMessage message, ICoapEndpoint endp
if (message.IsMulticast && message.Type != CoapMessageType.NonConfirmable)
throw new CoapClientException("Can not send confirmable (CON) CoAP message to a multicast endpoint");

var messageId = message.GetIdentifier(endpoint);
var messageId = message.GetIdentifier(endpoint, message.Type == CoapMessageType.Confirmable || message.Type == CoapMessageType.NonConfirmable);

_messageResponses.TryAdd(messageId, new TaskCompletionSource<CoapMessage>(TaskCreationOptions.RunContinuationsAsynchronously));

if (message.Type != CoapMessageType.Confirmable)
{
await SendAsyncInternal(message, endpoint, token).ConfigureAwait(false);
return message.Id;
return messageId;
}

Debug.Assert(_messageResponses.TryGetValue(messageId, out var responseTaskSource), "Race condition?");
Expand All @@ -464,7 +465,7 @@ public virtual async Task<int> SendAsync(CoapMessage message, ICoapEndpoint endp


if (responseTaskSource.Task.IsCompleted)
return message.Id;
return messageId;
}
throw new CoapClientException($"Max retransmission attempts reached for Message Id: {message.Id}");
}
Expand Down Expand Up @@ -508,7 +509,7 @@ internal void SetNextMessageId(int value)
/// <param name="uri"></param>
/// <param name="token"></param>
/// <returns></returns>
public virtual async Task<int> GetAsync(string uri, CancellationToken token = default(CancellationToken))
public virtual async Task<CoapMessageIdentifier> GetAsync(string uri, CancellationToken token = default(CancellationToken))
{
return await GetAsync(uri, null, token);
}
Expand All @@ -520,7 +521,7 @@ internal void SetNextMessageId(int value)
/// <param name="endpoint"></param>
/// <param name="token"></param>
/// <returns></returns>
public virtual async Task<int> GetAsync(string uri, ICoapEndpoint endpoint, CancellationToken token = default(CancellationToken))
public virtual async Task<CoapMessageIdentifier> GetAsync(string uri, ICoapEndpoint endpoint, CancellationToken token = default(CancellationToken))
{
var message = new CoapMessage
{
Expand Down
28 changes: 20 additions & 8 deletions src/CoAPNet/CoapMessageIdentifier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ namespace CoAPNet
{
public static class CoapMessageIdentifierExtensions
{
public static CoapMessageIdentifier GetIdentifier(this CoapMessage message, ICoapEndpoint endpoint = null)
=> new CoapMessageIdentifier(message, endpoint);
public static CoapMessageIdentifier GetIdentifier(this CoapMessage message, ICoapEndpoint endpoint = null, bool isRequest = false)
=> new CoapMessageIdentifier(message, endpoint, isRequest);
}

public struct CoapMessageIdentifier
Expand All @@ -20,11 +20,17 @@ public struct CoapMessageIdentifier

public readonly ICoapEndpoint Endpoint;

public CoapMessageIdentifier(CoapMessage message, ICoapEndpoint endpoint = null)
/// <summary>
/// When <c>true</c>, the associated identifer is for a requesting <see cref="CoapMessage"/>.
/// </summary>
public readonly bool IsRequest;

public CoapMessageIdentifier(CoapMessage message, ICoapEndpoint endpoint = null, bool isRequest = false)
{
Id = message.Id;
MessageType = message.Type;
Endpoint = endpoint;
IsRequest = isRequest;

// Even no tokens are treated as a zero-length token representing 0x0
Token = new byte[message.Token?.Length ?? 0];
Expand All @@ -33,11 +39,12 @@ public CoapMessageIdentifier(CoapMessage message, ICoapEndpoint endpoint = null)
Array.Copy(message.Token, Token, message.Token.Length);
}

public CoapMessageIdentifier(int id, CoapMessageType messageType, byte[] token = null, ICoapEndpoint endpoint = null)
public CoapMessageIdentifier(int id, CoapMessageType messageType, byte[] token = null, ICoapEndpoint endpoint = null, bool isRequest = false)
{
Id = id;
MessageType = messageType;
Endpoint = endpoint;
IsRequest = isRequest;

// Even no tokens are treated as a zero-length token representing 0x0
Token = new byte[token?.Length ?? 0];
Expand All @@ -56,14 +63,14 @@ public CoapMessageIdentifier(int id, CoapMessageType messageType, byte[] token =
return false;

// Only check the ID on a piggypacked response. Reponses that arrive after a an Acknowledge have a new ID
if (((A.MessageType == CoapMessageType.Confirmable && B.MessageType == CoapMessageType.Acknowledgement) ||
(B.MessageType == CoapMessageType.Confirmable && A.MessageType == CoapMessageType.Acknowledgement)) &&
if (((A.IsRequest && B.MessageType == CoapMessageType.Acknowledgement) ||
(B.IsRequest && A.MessageType == CoapMessageType.Acknowledgement)) &&
A.Id != B.Id)
return false;

return true;
}

/// <inheritdoc />
public override string ToString()
{
Expand Down Expand Up @@ -101,7 +108,12 @@ public override int GetHashCode()
{
unchecked
{
return Token.Aggregate(270, (a, t) => a + t.GetHashCode() * 21890);
var idHash = IsRequest || MessageType == CoapMessageType.Acknowledgement
? Id * 55865
: 0;
var tokenHash = Token.Aggregate(270, (a, t) => a + t.GetHashCode() * 21890);

return tokenHash ^ idHash;
}
}
}
Expand Down

0 comments on commit f3019ff

Please sign in to comment.