Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
chkr1011 committed Aug 11, 2019
2 parents 01aa754 + 2300e68 commit f54598c
Show file tree
Hide file tree
Showing 43 changed files with 760 additions and 169 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,5 @@ __pycache__/
*.map

/Tests/MQTTnet.TestApp.NetCore/RetainedMessages.json

Build/NuGet/
11 changes: 10 additions & 1 deletion Build/MQTTnet.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,16 @@
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker) and supports v3.1.0, v3.1.1 and v5.0.0 of the MQTT protocol.</description>
<releaseNotes>
* [Server] Moved new socket options to TCP options to avoid incompatibility with Linux hosts.
* [Core] Nuget packages with symbols are now also published to improve debugging.
* [Core] Improve task handling (thanks to @mwinterb)
* [ManagedClient] Fix a race condition in the message storage (thanks to @PaulFake).
* [Server] Added items dictionary to client session in order to share data across interceptors as along as the session exists.
* [Server] Exposed CONNECT packet properties in Application Message and Subscription interceptor.
* [Server] Fixed: Sending Large packets with AspnetCore based connection throws System.ArgumentException.
* [Server] Fixed wrong usage of socket option _NoDelay_.
* [Server] Added remote certificate validation callback (thanks to @rudacs).
* [Server] Add support for certificate passwords (thanks to @cslutgen).
* [MQTTnet.Server] Added REST API for publishing basic messages.
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2019</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
Expand Down
12 changes: 6 additions & 6 deletions Build/build.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ Copy-Item MQTTnet.Extensions.WebSocket4Net.nuspec -Destination MQTTnet.Extension
(Get-Content MQTTnet.Extensions.WebSocket4Net.nuspec) -replace '\$nugetVersion', $nugetVersion | Set-Content MQTTnet.Extensions.WebSocket4Net.nuspec

New-Item -ItemType Directory -Force -Path .\NuGet
.\nuget.exe pack MQTTnet.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion
.\nuget.exe pack MQTTnet.NETStandard.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion
.\nuget.exe pack MQTTnet.AspNetCore.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion
.\nuget.exe pack MQTTnet.Extensions.Rpc.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion
.\nuget.exe pack MQTTnet.Extensions.ManagedClient.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion
.\nuget.exe pack MQTTnet.Extensions.WebSocket4Net.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion
.\nuget.exe pack MQTTnet.nuspec -Verbosity detailed -Symbols -SymbolPackageFormat snupkg -OutputDir "NuGet" -Version $nugetVersion
.\nuget.exe pack MQTTnet.NETStandard.nuspec -Verbosity detailed -Symbols -SymbolPackageFormat snupkg -OutputDir "NuGet" -Version $nugetVersion
.\nuget.exe pack MQTTnet.AspNetCore.nuspec -Verbosity detailed -Symbols -SymbolPackageFormat snupkg -OutputDir "NuGet" -Version $nugetVersion
.\nuget.exe pack MQTTnet.Extensions.Rpc.nuspec -Verbosity detailed -Symbols -SymbolPackageFormat snupkg -OutputDir "NuGet" -Version $nugetVersion
.\nuget.exe pack MQTTnet.Extensions.ManagedClient.nuspec -Verbosity detailed -Symbols -SymbolPackageFormat snupkg -OutputDir "NuGet" -Version $nugetVersion
.\nuget.exe pack MQTTnet.Extensions.WebSocket4Net.nuspec -Verbosity detailed -Symbols -SymbolPackageFormat snupkg -OutputDir "NuGet" -Version $nugetVersion

Move-Item MQTTnet.AspNetCore.nuspec.old -Destination MQTTnet.AspNetCore.nuspec -Force
Move-Item MQTTnet.Extensions.Rpc.nuspec.old -Destination MQTTnet.Extensions.Rpc.nuspec -Force
Expand Down
2 changes: 1 addition & 1 deletion Build/upload.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ foreach ($file in $files)
{
Write-Host "Uploading: " $file

.\nuget.exe push $file.Fullname $apiKey -NoSymbols -Source https://api.nuget.org/v3/index.json
.\nuget.exe push $file.Fullname $apiKey -Source https://api.nuget.org/v3/index.json
}

Remove-Item "nuget.exe" -Force -Recurse -ErrorAction SilentlyContinue
9 changes: 5 additions & 4 deletions Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,12 @@ public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, Cance
var buffer = formatter.Encode(packet);
var msg = buffer.AsMemory();
var output = _output;
msg.CopyTo(output.GetMemory(msg.Length));
BytesSent += msg.Length;
var result = await output.WriteAsync(msg, cancellationToken).ConfigureAwait(false);
if (result.IsCompleted)
{
BytesSent += msg.Length;
}
PacketFormatterAdapter.FreeBuffer();
output.Advance(msg.Length);
await output.FlushAsync().ConfigureAwait(false);
}
finally
{
Expand Down
54 changes: 28 additions & 26 deletions Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class ManagedMqttClient : IManagedMqttClient

private readonly IMqttClient _mqttClient;
private readonly IMqttNetChildLogger _logger;

private readonly AsyncLock _messageQueueLock = new AsyncLock();

private CancellationTokenSource _connectionCancellationToken;
private CancellationTokenSource _publishingCancellationToken;
Expand Down Expand Up @@ -147,7 +149,7 @@ public async Task PublishAsync(ManagedMqttApplicationMessage applicationMessage)

try
{
lock (_messageQueue)
using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false))
{
if (_messageQueue.Count >= Options.MaxPendingMessages)
{
Expand All @@ -167,6 +169,16 @@ public async Task PublishAsync(ManagedMqttApplicationMessage applicationMessage)
}

_messageQueue.Enqueue(applicationMessage);

if (_storageManager != null)
{
if (removedMessage != null)
{
await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false);
}

await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
}
}
}
finally
Expand All @@ -181,16 +193,6 @@ public async Task PublishAsync(ManagedMqttApplicationMessage applicationMessage)
}

}

if (_storageManager != null)
{
if (removedMessage != null)
{
await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false);
}

await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
}
}

public Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
Expand Down Expand Up @@ -362,7 +364,7 @@ private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToke
}
catch (Exception exception)
{
_logger.Error(exception, "Unhandled exception while publishing queued application messages.");
_logger.Error(exception, "Error while publishing queued application messages.");
}
finally
{
Expand All @@ -377,7 +379,7 @@ private async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage me
{
await _mqttClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false);

lock (_messageQueue) //lock to avoid conflict with this.PublishAsync
using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync
{
// While publishing this message, this.PublishAsync could have booted this
// message off the queue to make room for another (when using a cap
Expand All @@ -386,11 +388,11 @@ private async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage me
// it from the queue. If not, that means this.PublishAsync has already
// removed it, in which case we don't want to do anything.
_messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
}

if (_storageManager != null)
{
await _storageManager.RemoveAsync(message).ConfigureAwait(false);
if (_storageManager != null)
{
await _storageManager.RemoveAsync(message).ConfigureAwait(false);
}
}
}
catch (MqttCommunicationException exception)
Expand All @@ -408,21 +410,21 @@ private async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage me
//contradict the expected behavior of QoS 1 and 2, that's also true
//for the usage of a message queue cap, so it's still consistent
//with prior behavior in that way.
lock (_messageQueue) //lock to avoid conflict with this.PublishAsync
using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync
{
_messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
}

if (_storageManager != null)
{
await _storageManager.RemoveAsync(message).ConfigureAwait(false);
if (_storageManager != null)
{
await _storageManager.RemoveAsync(message).ConfigureAwait(false);
}
}
}
}
catch (Exception exception)
{
transmitException = exception;
_logger.Error(exception, $"Unhandled exception while publishing application message ({message.Id}).");
_logger.Error(exception, $"Error while publishing application message ({message.Id}).");
}
finally
{
Expand Down Expand Up @@ -533,4 +535,4 @@ private void StopMaintainingConnection()
_connectionCancellationToken = null;
}
}
}
}
2 changes: 1 addition & 1 deletion Source/MQTTnet.Server/Controllers/ClientsController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace MQTTnet.Server.Controllers
{
[Authorize]
[ApiController]
public class ClientsController : ControllerBase
public class ClientsController : Controller
{
private readonly MqttServerService _mqttServerService;

Expand Down
51 changes: 51 additions & 0 deletions Source/MQTTnet.Server/Controllers/MessagesController.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using MQTTnet.Protocol;
using MQTTnet.Server.Mqtt;

namespace MQTTnet.Server.Controllers
{
[Authorize]
[ApiController]
public class MessagesController : Controller
{
private readonly MqttServerService _mqttServerService;

public MessagesController(MqttServerService mqttServerService)
{
_mqttServerService = mqttServerService ?? throw new ArgumentNullException(nameof(mqttServerService));
}

[Route("api/v1/messages")]
[HttpPost]
public async Task<ActionResult> PostMessage(MqttApplicationMessage message)
{
await _mqttServerService.PublishAsync(message);
return Ok();
}

[Route("api/v1/messages/{*topic}")]
[HttpPost]
public async Task<ActionResult> PostMessage(string topic, int qosLevel = 0)
{
byte[] payload;

using (var memoryStream = new MemoryStream())
{
await HttpContext.Request.Body.CopyToAsync(memoryStream);
payload = memoryStream.ToArray();
}

var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel((MqttQualityOfServiceLevel)qosLevel)
.Build();

return await PostMessage(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace MQTTnet.Server.Controllers
{
[Authorize]
[ApiController]
public class RetainedApplicationMessagesController : ControllerBase
public class RetainedApplicationMessagesController : Controller
{
private readonly MqttServerService _mqttServerService;

Expand Down
18 changes: 18 additions & 0 deletions Source/MQTTnet.Server/Controllers/ServerController.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using System.Reflection;

namespace MQTTnet.Server.Controllers
{
[Authorize]
[ApiController]
public class ServerController : Controller
{
[Route("api/v1/server/version")]
[HttpGet]
public ActionResult<string> GetVersion()
{
return Assembly.GetExecutingAssembly().GetCustomAttribute<AssemblyInformationalVersionAttribute>().InformationalVersion;
}
}
}
2 changes: 1 addition & 1 deletion Source/MQTTnet.Server/Controllers/SessionsController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace MQTTnet.Server.Controllers
{
[Authorize]
[ApiController]
public class SessionsController : ControllerBase
public class SessionsController : Controller
{
private readonly MqttServerService _mqttServerService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ public Task InterceptApplicationMessagePublishAsync(MqttApplicationMessageInterc
{
try
{
var sessionItems = (PythonDictionary)context.SessionItems[MqttServerConnectionValidator.WrappedSessionItemsKey];

var pythonContext = new PythonDictionary
{
{ "client_id", context.ClientId },
{ "session_items", sessionItems },
{ "retain", context.ApplicationMessage.Retain },
{ "accept_publish", context.AcceptPublish },
{ "close_connection", context.CloseConnection },
{ "client_id", context.ClientId },
{ "topic", context.ApplicationMessage.Topic },
{ "qos", (int)context.ApplicationMessage.QualityOfServiceLevel },
{ "retain", context.ApplicationMessage.Retain }
{ "qos", (int)context.ApplicationMessage.QualityOfServiceLevel }
};

_pythonScriptHostService.InvokeOptionalFunction("on_intercept_application_message", pythonContext);
Expand Down
7 changes: 7 additions & 0 deletions Source/MQTTnet.Server/Mqtt/MqttServerConnectionValidator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ namespace MQTTnet.Server.Mqtt
{
public class MqttServerConnectionValidator : IMqttServerConnectionValidator
{
public const string WrappedSessionItemsKey = "WRAPPED_ITEMS";

private readonly PythonScriptHostService _pythonScriptHostService;
private readonly ILogger _logger;

Expand All @@ -22,6 +24,8 @@ public Task ValidateConnectionAsync(MqttConnectionValidatorContext context)
{
try
{
var sessionItems = new PythonDictionary();

var pythonContext = new PythonDictionary
{
{ "endpoint", context.Endpoint },
Expand All @@ -33,13 +37,16 @@ public Task ValidateConnectionAsync(MqttConnectionValidatorContext context)
{ "clean_session", context.CleanSession},
{ "authentication_method", context.AuthenticationMethod},
{ "authentication_data", new Bytes(context.AuthenticationData ?? new byte[0]) },
{ "session_items", sessionItems },

{ "result", PythonConvert.Pythonfy(context.ReasonCode) }
};

_pythonScriptHostService.InvokeOptionalFunction("on_validate_client_connection", pythonContext);

context.ReasonCode = PythonConvert.ParseEnum<MqttConnectReasonCode>((string)pythonContext["result"]);

context.SessionItems[WrappedSessionItemsKey] = sessionItems;
}
catch (Exception exception)
{
Expand Down
10 changes: 6 additions & 4 deletions Source/MQTTnet.Server/Mqtt/MqttSubscriptionInterceptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ public Task InterceptSubscriptionAsync(MqttSubscriptionInterceptorContext contex
{
try
{
var sessionItems = (PythonDictionary)context.SessionItems[MqttServerConnectionValidator.WrappedSessionItemsKey];

var pythonContext = new PythonDictionary
{
{ "accept_subscription", context.AcceptSubscription },
{ "close_connection", context.CloseConnection },

{ "client_id", context.ClientId },
{ "session_items", sessionItems },
{ "topic", context.TopicFilter.Topic },
{ "qos", (int)context.TopicFilter.QualityOfServiceLevel }
{ "qos", (int)context.TopicFilter.QualityOfServiceLevel },
{ "accept_subscription", context.AcceptSubscription },
{ "close_connection", context.CloseConnection }
};

_pythonScriptHostService.InvokeOptionalFunction("on_intercept_subscription", pythonContext);
Expand Down
8 changes: 4 additions & 4 deletions Source/MQTTnet/Client/MqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,12 @@ private async Task DisconnectInternalAsync(Task sender, Exception exception, Mqt
{
var clientWasConnected = IsConnected;

InitiateDisconnect();

IsConnected = false;
TryInitiateDisconnect();

try
{
IsConnected = false;

if (_adapter != null)
{
_logger.Verbose("Disconnecting [Timeout={0}]", Options.CommunicationTimeout);
Expand Down Expand Up @@ -295,7 +295,7 @@ private async Task DisconnectInternalAsync(Task sender, Exception exception, Mqt
}
}

private void InitiateDisconnect()
private void TryInitiateDisconnect()
{
lock (_disconnectLock)
{
Expand Down
Loading

0 comments on commit f54598c

Please sign in to comment.