Skip to content

Commit

Permalink
Improve JsonDecoder StreamWriter dispose (#1448)
Browse files Browse the repository at this point in the history
- only dispose StreamWriter if owned by JsonEncoder
- only dispose if dispose is called
- on external stream writer, return text by using reflection on basestream, if memorystream is used
- improve benchmark common code
- fixes #1444
  • Loading branch information
mregen committed Jul 9, 2021
1 parent 08164fe commit c435e58
Show file tree
Hide file tree
Showing 32 changed files with 564 additions and 312 deletions.
19 changes: 7 additions & 12 deletions Libraries/Opc.Ua.PubSub/Encoding/JsonNetworkMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,8 @@ public override byte[] Encode()
};

using (MemoryStream stream = new MemoryStream())
using (var writer = new StreamWriter(stream, new UTF8Encoding(false)))
{
Encode(messageContext, writer);
Encode(messageContext, stream);
return stream.ToArray();
}
}
Expand All @@ -196,12 +195,12 @@ public override byte[] Encode()
/// Encodes the object in the specified stream.
/// </summary>
/// <param name="messageContext">The context.</param>
/// <param name="writer">The stream to use.</param>
public override void Encode(IServiceMessageContext messageContext, StreamWriter writer)
/// <param name="stream">The stream to use.</param>
public override void Encode(IServiceMessageContext messageContext, Stream stream)
{
bool topLevelIsArray = !HasNetworkMessageHeader && !HasSingleDataSetMessage && !IsMetaDataMessage;

using (JsonEncoder encoder = new JsonEncoder(messageContext, true, writer, topLevelIsArray))
using (JsonEncoder encoder = new JsonEncoder(messageContext, true, topLevelIsArray, stream))
{
if (IsMetaDataMessage)
{
Expand Down Expand Up @@ -261,22 +260,18 @@ public override void Encode(IServiceMessageContext messageContext, StreamWriter
/// <summary>
/// Decodes the message
/// </summary>
/// <param name="context"></param>
/// <param name="message"></param>
/// <param name="dataSetReaders"></param>
public override void Decode(byte[] message, IList<DataSetReaderDataType> dataSetReaders)
public override void Decode(IServiceMessageContext context, byte[] message, IList<DataSetReaderDataType> dataSetReaders)
{
if (dataSetReaders == null || dataSetReaders.Count == 0)
{
return;
}

IServiceMessageContext messageContext = new ServiceMessageContext() {
NamespaceUris = ServiceMessageContext.GlobalContext.NamespaceUris,
ServerUris = ServiceMessageContext.GlobalContext.ServerUris
};

string json = System.Text.Encoding.ASCII.GetString(message);
using (JsonDecoder decoder = new JsonDecoder(json, messageContext))
using (JsonDecoder decoder = new JsonDecoder(json, context))
{
//decode bytes using dataset reader information
DecodeSubscribedDataSets(decoder, dataSetReaders);
Expand Down
17 changes: 7 additions & 10 deletions Libraries/Opc.Ua.PubSub/Encoding/UadpNetworkMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,8 @@ public override byte[] Encode()
};

using (MemoryStream stream = new MemoryStream())
using (var writer = new StreamWriter(stream, new UTF8Encoding(false)))
{
Encode(messageContext, writer);
Encode(messageContext, stream);
return stream.ToArray();
}
}
Expand All @@ -278,10 +277,10 @@ public override byte[] Encode()
/// Encodes the object in the specified stream.
/// </summary>
/// <param name="messageContext">The system context.</param>
/// <param name="writer">The stream to use.</param>
public override void Encode(IServiceMessageContext messageContext, StreamWriter writer)
/// <param name="stream">The stream to use.</param>
public override void Encode(IServiceMessageContext messageContext, Stream stream)
{
using (BinaryEncoder encoder = new BinaryEncoder(writer.BaseStream, messageContext))
using (BinaryEncoder encoder = new BinaryEncoder(stream, messageContext))
{
Encode(encoder);
}
Expand All @@ -290,24 +289,22 @@ public override void Encode(IServiceMessageContext messageContext, StreamWriter
/// <summary>
/// Decodes the message
/// </summary>
/// <param name="context"></param>
/// <param name="message"></param>
/// <param name="dataSetReaders"></param>
public override void Decode(byte[] message, IList<DataSetReaderDataType> dataSetReaders)
public override void Decode(IServiceMessageContext context, byte[] message, IList<DataSetReaderDataType> dataSetReaders)
{
if (dataSetReaders == null || dataSetReaders.Count == 0)
{
return;
}

IServiceMessageContext messageContext = new ServiceMessageContext();

using (BinaryDecoder decoder = new BinaryDecoder(message, messageContext))
using (BinaryDecoder decoder = new BinaryDecoder(message, context))
{
//decode bytes using dataset reader information
DecodeSubscribedDataSets(decoder, dataSetReaders);
}
}

#endregion

#region Private Methods - Encoding
Expand Down
4 changes: 2 additions & 2 deletions Libraries/Opc.Ua.PubSub/Transport/MqttPubSubConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ private void ProcessMqttMessage(MqttApplicationMessageReceivedEventArgs eventArg

if (dataSetReaders.Count > 0)
{
// iniaialize the expected NetworkMessage
// initialize the expected NetworkMessage
UaNetworkMessage networkMessage = null;
if (m_messageMapping == MessageMapping.Uadp)
{
Expand All @@ -645,7 +645,7 @@ private void ProcessMqttMessage(MqttApplicationMessageReceivedEventArgs eventArg
// trigger message decoding
if (networkMessage != null)
{
networkMessage.Decode(eventArgs.ApplicationMessage.Payload, dataSetReaders);
networkMessage.Decode(m_context, eventArgs.ApplicationMessage.Payload, dataSetReaders);

// Raise the DataReceived event
RaiseNetworkMessageDataReceivedEvent(networkMessage, topic);
Expand Down
4 changes: 2 additions & 2 deletions Libraries/Opc.Ua.PubSub/Transport/UdpPubSubConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ internal class UdpPubSubConnection : UaPubSubConnection
#region Private Fields
private List<UdpClient> m_publisherUdpClients = new List<UdpClient>();
private List<UdpClient> m_subscriberUdpClients = new List<UdpClient>();

private static int m_sequenceNumber = 0;
private static int m_dataSetSequenceNumber = 0;

Expand All @@ -63,6 +62,7 @@ internal class UdpPubSubConnection : UaPubSubConnection
public UdpPubSubConnection(UaPubSubApplication uaPubSubApplication, PubSubConnectionDataType pubSubConnectionDataType)
: base(uaPubSubApplication, pubSubConnectionDataType)
{

m_transportProtocol = TransportProtocol.UADP;

Utils.Trace("UdpPubSubConnection with name '{0}' was created.", pubSubConnectionDataType.Name);
Expand Down Expand Up @@ -302,7 +302,7 @@ private void ProcessReceivedMessage(byte[] message, IPEndPoint source)
Utils.Trace(Utils.TraceMasks.Information, "UdpPubSubConnection.ProcessReceivedMessage from source={0}", source);

UadpNetworkMessage networkMessage = new UadpNetworkMessage();
networkMessage.Decode(message, GetOperationalDataSetReaders());
networkMessage.Decode(m_context, message, GetOperationalDataSetReaders());

// Raise rhe DataReceived event
RaiseNetworkMessageDataReceivedEvent(networkMessage, source.ToString());
Expand Down
8 changes: 4 additions & 4 deletions Libraries/Opc.Ua.PubSub/UaNetworkMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,22 +149,22 @@ public bool IsMetaDataMessage
/// <summary>
/// Encodes the object and returns the resulting byte array.
/// </summary>
/// <returns></returns>
public abstract byte[] Encode();

/// <summary>
/// Encodes the object in the specified stream.
/// </summary>
/// <param name="messageContext">The context.</param>
/// <param name="writer">The stream to use.</param>
public abstract void Encode(IServiceMessageContext messageContext, StreamWriter writer);
/// <param name="stream">The stream to use.</param>
public abstract void Encode(IServiceMessageContext messageContext, Stream stream);

/// <summary>
/// Decodes the message
/// </summary>
/// <param name="messageContext"></param>
/// <param name="message"></param>
/// <param name="dataSetReaders"></param>
public abstract void Decode(byte[] message, IList<DataSetReaderDataType> dataSetReaders);
public abstract void Decode(IServiceMessageContext messageContext, byte[] message, IList<DataSetReaderDataType> dataSetReaders);
#endregion
}
}
3 changes: 3 additions & 0 deletions Libraries/Opc.Ua.PubSub/UaPubSubConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ internal abstract class UaPubSubConnection : IUaPubSubConnection
{
#region Fields
protected object m_lock = new object();
protected IServiceMessageContext m_context;
private bool m_isRunning;
private List<IUaPublisher> m_publishers;
private PubSubConnectionDataType m_pubSubConnectionDataType;
Expand All @@ -54,6 +55,8 @@ internal abstract class UaPubSubConnection : IUaPubSubConnection
/// </summary>
public UaPubSubConnection(UaPubSubApplication parentUaPubSubApplication, PubSubConnectionDataType pubSubConnectionDataType)
{
m_context = new ServiceMessageContext();

if (parentUaPubSubApplication == null)
{
throw new ArgumentNullException(nameof(parentUaPubSubApplication));
Expand Down
19 changes: 9 additions & 10 deletions Stack/Opc.Ua.Core/Types/Encoders/BinaryEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public BinaryEncoder(IServiceMessageContext context)
m_ostrm = new MemoryStream();
m_writer = new BinaryWriter(m_ostrm);
m_context = context;
m_leaveOpen = false;
m_nestingLevel = 0;
}

Expand All @@ -46,19 +47,21 @@ public BinaryEncoder(byte[] buffer, int start, int count, IServiceMessageContext
m_ostrm = new MemoryStream(buffer, start, count);
m_writer = new BinaryWriter(m_ostrm);
m_context = context;
m_leaveOpen = false;
m_nestingLevel = 0;
}

/// <summary>
/// Creates an encoder that writes to the stream.
/// </summary>
public BinaryEncoder(Stream stream, IServiceMessageContext context)
public BinaryEncoder(Stream stream, IServiceMessageContext context, bool leaveOpen = false)
{
if (stream == null) throw new ArgumentNullException(nameof(stream));

m_ostrm = stream;
m_writer = new BinaryWriter(m_ostrm);
m_writer = new BinaryWriter(m_ostrm, Encoding.UTF8, leaveOpen);
m_context = context;
m_leaveOpen = leaveOpen;
m_nestingLevel = 0;
}
#endregion
Expand Down Expand Up @@ -121,9 +124,9 @@ public byte[] CloseAndReturnBuffer()
m_writer.Flush();
m_writer.Dispose();

if (m_ostrm is MemoryStream)
if (m_ostrm is MemoryStream memoryStream)
{
return ((MemoryStream)m_ostrm).ToArray();
return memoryStream.ToArray();
}

return null;
Expand Down Expand Up @@ -156,11 +159,6 @@ public int Position
}
}

/// <summary>
/// Gets the stream that the encoder is writing to.
/// </summary>
public Stream BaseStream => m_writer.BaseStream;

/// <summary>
/// Writes raw bytes to the stream.
/// </summary>
Expand Down Expand Up @@ -1760,7 +1758,7 @@ public void WriteArray(string fieldName, object array, int valueRank, BuiltInTyp
}
break;
}
case BuiltInType.Enumeration:
case BuiltInType.Enumeration:
case BuiltInType.Int32:
{
Int32[] values = (Int32[])matrix.Elements;
Expand Down Expand Up @@ -2353,6 +2351,7 @@ private void WriteVariantValue(string fieldName, Variant value)
#region Private Fields
private Stream m_ostrm;
private BinaryWriter m_writer;
private bool m_leaveOpen;
private IServiceMessageContext m_context;
private ushort[] m_namespaceMappings;
private ushort[] m_serverMappings;
Expand Down
7 changes: 3 additions & 4 deletions Stack/Opc.Ua.Core/Types/Encoders/JsonDecoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1448,13 +1448,12 @@ public ExtensionObject ReadExtensionObject(string fieldName)
}

var ostrm = new MemoryStream();

using (JsonTextWriter writer = new JsonTextWriter(new StreamWriter(ostrm)))
using (var stream = new StreamWriter(ostrm))
using (JsonTextWriter writer = new JsonTextWriter(stream))
{
EncodeAsJson(writer, token);
return new ExtensionObject(typeId, ostrm.ToArray());
}

return new ExtensionObject(typeId, ostrm.ToArray());
}
finally
{
Expand Down

0 comments on commit c435e58

Please sign in to comment.