Skip to content

Commit

Permalink
Accept multiple SubscriptionIdentifiers for PUBLISH packets.
Browse files Browse the repository at this point in the history
  • Loading branch information
chkr1011 committed May 12, 2019
1 parent 27476a9 commit c22b603
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 50 deletions.
5 changes: 3 additions & 2 deletions Source/MQTTnet/Formatter/V5/MqttV500DataConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ public MqttPublishPacket CreatePublishPacket(MqttApplicationMessage applicationM
MessageExpiryInterval = applicationMessage.MessageExpiryInterval,
PayloadFormatIndicator = applicationMessage.PayloadFormatIndicator,
ResponseTopic = applicationMessage.ResponseTopic,
SubscriptionIdentifier = applicationMessage.SubscriptionIdentifier,
SubscriptionIdentifiers = applicationMessage.SubscriptionIdentifiers,
TopicAlias = applicationMessage.TopicAlias
}
};

if (applicationMessage.UserProperties != null)
{
packet.Properties.UserProperties = new List<MqttUserProperty>();
packet.Properties.UserProperties.AddRange(applicationMessage.UserProperties);
}

Expand Down Expand Up @@ -67,7 +68,7 @@ public MqttApplicationMessage CreateApplicationMessage(MqttPublishPacket publish
ContentType = publishPacket.Properties?.ContentType,
CorrelationData = publishPacket.Properties?.CorrelationData,
MessageExpiryInterval = publishPacket.Properties?.MessageExpiryInterval,
SubscriptionIdentifier = publishPacket.Properties?.SubscriptionIdentifier,
SubscriptionIdentifiers = publishPacket.Properties?.SubscriptionIdentifiers,
TopicAlias = publishPacket.Properties?.TopicAlias,
PayloadFormatIndicator = publishPacket.Properties?.PayloadFormatIndicator,
UserProperties = publishPacket.Properties?.UserProperties ?? new List<MqttUserProperty>()
Expand Down
53 changes: 37 additions & 16 deletions Source/MQTTnet/Formatter/V5/MqttV500PacketDecoder.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using MQTTnet.Adapter;
using MQTTnet.Exceptions;
Expand Down Expand Up @@ -117,7 +118,7 @@ private static MqttBasePacket DecodeConnectPacket(IMqttPacketBodyReader body)
}
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
{
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
}
else
{
Expand Down Expand Up @@ -155,7 +156,12 @@ private static MqttBasePacket DecodeConnectPacket(IMqttPacketBodyReader body)
}
else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.SubscriptionIdentifier)
{
packet.WillMessage.SubscriptionIdentifier = propertiesReader.ReadSubscriptionIdentifier();
if (packet.WillMessage.SubscriptionIdentifiers == null)
{
packet.WillMessage.SubscriptionIdentifiers = new List<uint>();
}

packet.WillMessage.SubscriptionIdentifiers.Add(propertiesReader.ReadSubscriptionIdentifier());
}
else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.ContentType)
{
Expand All @@ -168,7 +174,12 @@ private static MqttBasePacket DecodeConnectPacket(IMqttPacketBodyReader body)
}
else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
{
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
if (packet.WillMessage.UserProperties == null)
{
packet.WillMessage.UserProperties = new List<MqttUserProperty>();
}

propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
}
else
{
Expand Down Expand Up @@ -271,7 +282,7 @@ private static MqttBasePacket DecodeConnAckPacket(IMqttPacketBodyReader body)
}
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
{
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
}
else
{
Expand Down Expand Up @@ -309,7 +320,7 @@ private static MqttBasePacket DecodeDisconnectPacket(IMqttPacketBodyReader body)
}
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
{
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
}
else
{
Expand Down Expand Up @@ -339,7 +350,7 @@ private static MqttBasePacket DecodeSubscribePacket(IMqttPacketBodyReader body)
}
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
{
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
}
else
{
Expand Down Expand Up @@ -389,7 +400,7 @@ private static MqttBasePacket DecodeSubAckPacket(IMqttPacketBodyReader body)
}
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
{
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
}
else
{
Expand Down Expand Up @@ -421,7 +432,7 @@ private static MqttBasePacket DecodeUnsubscribePacket(IMqttPacketBodyReader body
{
if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
{
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
}
else
{
Expand Down Expand Up @@ -456,7 +467,7 @@ private static MqttBasePacket DecodeUnsubAckPacket(IMqttPacketBodyReader body)
}
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
{
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
}
else
{
Expand Down Expand Up @@ -530,15 +541,25 @@ private static MqttBasePacket DecodePublishPacket(byte header, IMqttPacketBodyRe
}
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.SubscriptionIdentifier)
{
packet.Properties.SubscriptionIdentifier = propertiesReader.ReadSubscriptionIdentifier();
if (packet.Properties.SubscriptionIdentifiers == null)
{
packet.Properties.SubscriptionIdentifiers = new List<uint>();
}

packet.Properties.SubscriptionIdentifiers.Add(propertiesReader.ReadSubscriptionIdentifier());
}
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ContentType)
{
packet.Properties.ContentType = propertiesReader.ReadContentType();
}
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
{
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
if (packet.Properties.UserProperties == null)
{
packet.Properties.UserProperties = new List<MqttUserProperty>();
}

propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
}
else
{
Expand Down Expand Up @@ -581,7 +602,7 @@ private static MqttBasePacket DecodePubAckPacket(IMqttPacketBodyReader body)
}
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
{
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
}
else
{
Expand Down Expand Up @@ -619,7 +640,7 @@ private static MqttBasePacket DecodePubRecPacket(IMqttPacketBodyReader body)
}
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
{
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
}
else
{
Expand Down Expand Up @@ -657,7 +678,7 @@ private static MqttBasePacket DecodePubRelPacket(IMqttPacketBodyReader body)
}
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
{
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
}
else
{
Expand Down Expand Up @@ -695,7 +716,7 @@ private static MqttBasePacket DecodePubCompPacket(IMqttPacketBodyReader body)
}
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
{
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
}
else
{
Expand Down Expand Up @@ -740,7 +761,7 @@ private static MqttBasePacket DecodeAuthPacket(IMqttPacketBodyReader body)
}
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
{
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
}
else
{
Expand Down
4 changes: 2 additions & 2 deletions Source/MQTTnet/Formatter/V5/MqttV500PacketEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private static byte EncodeConnectPacket(MqttConnectPacket packet, IMqttPacketWri
willPropertiesWriter.WriteTopicAlias(packet.WillMessage.TopicAlias);
willPropertiesWriter.WriteResponseTopic(packet.WillMessage.ResponseTopic);
willPropertiesWriter.WriteCorrelationData(packet.WillMessage.CorrelationData);
willPropertiesWriter.WriteSubscriptionIdentifier(packet.WillMessage.SubscriptionIdentifier);
willPropertiesWriter.WriteSubscriptionIdentifiers(packet.WillMessage.SubscriptionIdentifiers);
willPropertiesWriter.WriteContentType(packet.WillMessage.ContentType);
willPropertiesWriter.WriteUserProperties(packet.WillMessage.UserProperties);

Expand Down Expand Up @@ -258,7 +258,7 @@ private static byte EncodePublishPacket(MqttPublishPacket packet, IMqttPacketWri
propertiesWriter.WriteTopicAlias(packet.Properties.TopicAlias);
propertiesWriter.WriteResponseTopic(packet.Properties.ResponseTopic);
propertiesWriter.WriteCorrelationData(packet.Properties.CorrelationData);
propertiesWriter.WriteSubscriptionIdentifier(packet.Properties.SubscriptionIdentifier);
propertiesWriter.WriteSubscriptionIdentifiers(packet.Properties.SubscriptionIdentifiers);
propertiesWriter.WriteContentType(packet.Properties.ContentType);
propertiesWriter.WriteUserProperties(packet.Properties.UserProperties);
}
Expand Down
32 changes: 16 additions & 16 deletions Source/MQTTnet/Formatter/V5/MqttV500PropertiesReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public bool MoveNext()
return true;
}

public void FillUserProperties(List<MqttUserProperty> userProperties)
public void AddUserPropertyTo(List<MqttUserProperty> userProperties)
{
if (userProperties == null) throw new ArgumentNullException(nameof(userProperties));

Expand All @@ -67,17 +67,17 @@ public byte[] ReadAuthenticationData()
return _body.ReadWithLengthPrefix();
}

public bool? ReadRetainAvailable()
public bool ReadRetainAvailable()
{
return _body.ReadBoolean();
}

public uint? ReadSessionExpiryInterval()
public uint ReadSessionExpiryInterval()
{
return _body.ReadFourByteInteger();
}

public ushort? ReadReceiveMaximum()
public ushort ReadReceiveMaximum()
{
return _body.ReadTwoByteInteger();
}
Expand All @@ -92,17 +92,17 @@ public string ReadServerReference()
return _body.ReadStringWithLengthPrefix();
}

public ushort? ReadTopicAliasMaximum()
public ushort ReadTopicAliasMaximum()
{
return _body.ReadTwoByteInteger();
}

public uint? ReadMaximumPacketSize()
public uint ReadMaximumPacketSize()
{
return _body.ReadFourByteInteger();
}

public ushort? ReadServerKeepAlive()
public ushort ReadServerKeepAlive()
{
return _body.ReadTwoByteInteger();
}
Expand All @@ -112,22 +112,22 @@ public string ReadResponseInformation()
return _body.ReadStringWithLengthPrefix();
}

public bool? ReadSharedSubscriptionAvailable()
public bool ReadSharedSubscriptionAvailable()
{
return _body.ReadBoolean();
}

public bool? ReadSubscriptionIdentifiersAvailable()
public bool ReadSubscriptionIdentifiersAvailable()
{
return _body.ReadBoolean();
}

public bool? ReadWildcardSubscriptionAvailable()
public bool ReadWildcardSubscriptionAvailable()
{
return _body.ReadBoolean();
}

public uint? ReadSubscriptionIdentifier()
public uint ReadSubscriptionIdentifier()
{
return _body.ReadVariableLengthInteger();
}
Expand All @@ -137,12 +137,12 @@ public string ReadResponseInformation()
return (MqttPayloadFormatIndicator)_body.ReadByte();
}

public uint? ReadMessageExpiryInterval()
public uint ReadMessageExpiryInterval()
{
return _body.ReadFourByteInteger();
}

public ushort? ReadTopicAlias()
public ushort ReadTopicAlias()
{
return _body.ReadTwoByteInteger();
}
Expand All @@ -162,17 +162,17 @@ public string ReadContentType()
return _body.ReadStringWithLengthPrefix();
}

public uint? ReadWillDelayInterval()
public uint ReadWillDelayInterval()
{
return _body.ReadFourByteInteger();
}

public bool? RequestResponseInformation()
public bool RequestResponseInformation()
{
return _body.ReadBoolean();
}

public bool? RequestProblemInformation()
public bool RequestProblemInformation()
{
return _body.ReadBoolean();
}
Expand Down
13 changes: 13 additions & 0 deletions Source/MQTTnet/Formatter/V5/MqttV500PropertiesWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,19 @@ public void WriteSubscriptionIdentifier(uint? value)
WriteAsVariableLengthInteger(MqttPropertyId.SubscriptionIdentifier, value);
}

public void WriteSubscriptionIdentifiers(IEnumerable<uint> value)
{
if (value == null)
{
return;
}

foreach (var subscriptionIdentifier in value)
{
WriteAsVariableLengthInteger(MqttPropertyId.SubscriptionIdentifier, subscriptionIdentifier);
}
}

public void WriteTopicAlias(ushort? value)
{
Write(MqttPropertyId.TopicAlias, value);
Expand Down
2 changes: 1 addition & 1 deletion Source/MQTTnet/MqttApplicationMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ public class MqttApplicationMessage

public byte[] CorrelationData { get; set; }

public uint? SubscriptionIdentifier { get; set; }
public List<uint> SubscriptionIdentifiers { get; set; }
}
}
Loading

0 comments on commit c22b603

Please sign in to comment.