Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqttnet4 #588

Merged
merged 2 commits into from Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions IoTSharp.Data.MySQL/IoTSharp.Data.MySQL.csproj
Expand Up @@ -14,9 +14,9 @@
</PackageReference>
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="6.0.1" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="6.0.0" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="6.0.1" />
<PackageReference Include="EFCore.Sharding.MySql" Version="6.0.0" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql.NetTopologySuite" Version="6.0.0" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql.NetTopologySuite" Version="6.0.1" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion IoTSharp.TaskAction/DeviceActionExcutor.cs
Expand Up @@ -26,7 +26,7 @@ public override async Task<TaskActionOutput> ExecuteAsync(TaskActionInput input)
string contentType = "application/json";
var restclient = new RestClient(config.BaseUrl);

var request = new RestRequest(config.Url + (input.DeviceId == Guid.Empty ? "" : "/" + input.DeviceId), Method.Post);
var request = new RestRequest(config.Url + (input.DeviceId == Guid.Empty ? "" : "/" + input.DeviceId), Method.POST);
request.AddHeader("X-Access-Token",
config.Token);
request.RequestFormat = DataFormat.Json;
Expand Down
2 changes: 1 addition & 1 deletion IoTSharp.TaskAction/IoTSharp.TaskAction.csproj
Expand Up @@ -5,7 +5,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="RestSharp" Version="107.1.1" />
<PackageReference Include="RestSharp" Version="106.15.0" />
</ItemGroup>

<ItemGroup>
Expand Down
4 changes: 2 additions & 2 deletions IoTSharp.TaskAction/MessagePullExcutor.cs
Expand Up @@ -64,7 +64,7 @@ private async Task<TaskActionOutput> SendData(TaskActionInput input)
var dd = o.Properties().Select(c => new ParamObject { keyName = c.Name, value = JPropertyToObject(c.Value.First as JProperty) }).ToList();
string contentType = "application/json";
var restclient = new RestClient(config.BaseUrl);
var request = new RestRequest(config.Url + (input.DeviceId == Guid.Empty ? "" : "/" + input.DeviceId), Method.Post);
var request = new RestRequest(config.Url + (input.DeviceId == Guid.Empty ? "" : "/" + input.DeviceId), Method.POST);
request.AddHeader("X-Access-Token",
config.Token);
request.RequestFormat = DataFormat.Json;
Expand Down Expand Up @@ -98,7 +98,7 @@ private async Task<TaskActionOutput> SendData(TaskActionInput input)
var dd = o.Properties().Select(c => new ParamObject { keyName = c.Name, value = JPropertyToObject(c) }).ToList();
string contentType = "application/json";
var restclient = new RestClient(config.BaseUrl);
var request = new RestRequest(config.Url + (input.DeviceId == Guid.Empty ? "" : "/" + input.DeviceId), Method.Post);
var request = new RestRequest(config.Url + (input.DeviceId == Guid.Empty ? "" : "/" + input.DeviceId), Method.POST);
request.AddHeader("X-Access-Token",
config.Token);
request.RequestFormat = DataFormat.Json;
Expand Down
23 changes: 10 additions & 13 deletions IoTSharp/Clients/RpcClient.cs
@@ -1,8 +1,6 @@
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Exceptions;
using MQTTnet.Protocol;
using System;
Expand Down Expand Up @@ -30,9 +28,11 @@ public RpcClient(IMqttClient mqttClient, ILogger logger)
{
_mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
_logger = logger;
_mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(args => OnApplicationMessageReceived(mqttClient, args) );
_mqttClient.ApplicationMessageReceivedAsync += OnApplicationMessageReceived;
}



public RpcClient(IMqttClientOptions mqtt, Microsoft.Extensions.Logging.ILogger _logger) :this (new MQTTnet.MqttFactory().CreateMqttClient(), _logger)
{
_mqtt = mqtt;
Expand Down Expand Up @@ -121,19 +121,16 @@ public async Task<byte[]> ExecuteAsync(TimeSpan timeout, string deviceid, string
}
}

private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
private Task OnApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs eventArgs)
{
if (!_waitingCalls.TryRemove(eventArgs.ApplicationMessage.Topic, out var tcs))
{
return;
}

if (tcs.Task.IsCompleted || tcs.Task.IsCanceled)
if (_waitingCalls.TryRemove(eventArgs.ApplicationMessage.Topic, out var tcs))
{
return;
if (!tcs.Task.IsCompleted && !tcs.Task.IsCanceled)
{
tcs.TrySetResult(eventArgs.ApplicationMessage.Payload);
}
}

tcs.TrySetResult(eventArgs.ApplicationMessage.Payload);
return Task.CompletedTask;
}


Expand Down
20 changes: 8 additions & 12 deletions IoTSharp/Controllers/DevicesController.cs
Expand Up @@ -19,21 +19,17 @@
using IoTSharp.Extensions;
using IoTSharp.Models;
using MQTTnet.Exceptions;
using MQTTnet.Client.Options;
using Microsoft.AspNetCore.Identity;
using Microsoft.Extensions.Logging;
using IoTSharp.Storage;
using k8s.Models;
using Newtonsoft.Json.Linq;
using MQTTnet.AspNetCoreEx;
using MQTTnet.Server.Status;
using System.Security.Cryptography.X509Certificates;
using Microsoft.Extensions.Options;
using IoTSharp.X509Extensions;
using System.IO;
using System.IO.Compression;
using DotNetCore.CAP;
using LinqKit;
using MQTTnet.Server;

namespace IoTSharp.Controllers
{
Expand All @@ -51,12 +47,12 @@ public class DevicesController : ControllerBase
private readonly SignInManager<IdentityUser> _signInManager;
private readonly ILogger _logger;
private readonly IStorage _storage;
private readonly IMqttServerEx _serverEx;
private readonly MqttServer _serverEx;
private readonly AppSettings _setting;
private readonly ICapPublisher _queue;

public DevicesController(UserManager<IdentityUser> userManager,
SignInManager<IdentityUser> signInManager, ILogger<DevicesController> logger, IMqttServerEx serverEx, ApplicationDbContext context, IMqttClientOptions mqtt, IStorage storage, IOptions<AppSettings> options, ICapPublisher queue)
SignInManager<IdentityUser> signInManager, ILogger<DevicesController> logger, MqttServer serverEx, ApplicationDbContext context, IMqttClientOptions mqtt, IStorage storage, IOptions<AppSettings> options, ICapPublisher queue)
{
_context = context;
_mqtt = mqtt;
Expand Down Expand Up @@ -944,9 +940,9 @@ public async Task<ApiResult<Dic>> EditAttribute(Guid devid, DeviceAttrEditDto at
[HttpGet("SessionStatus")]
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesDefaultResponseType]
public async Task<ApiResult<IList<IMqttSessionStatus>>> GetSessionStatus()
public async Task<ApiResult<IList<MqttSessionStatus>>> GetSessionStatus()
{
return new ApiResult<IList<IMqttSessionStatus>>(ApiCode.Success, "OK", await _serverEx.GetSessionStatusAsync());
return new ApiResult<IList<MqttSessionStatus>>(ApiCode.Success, "OK", await _serverEx.GetSessionsAsync());
}
/// <summary>
/// SessionStatus
Expand All @@ -956,9 +952,9 @@ public async Task<ApiResult<IList<IMqttSessionStatus>>> GetSessionStatus()
[HttpGet("ClientStatus")]
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesDefaultResponseType]
public async Task<ApiResult<IList<IMqttClientStatus>>> GetClientStatus()
public async Task<ApiResult<IList<MqttClientStatus>>> GetClientStatus()
{
return new ApiResult<IList<IMqttClientStatus>>(ApiCode.Success, "OK", await _serverEx.GetClientStatusAsync());
return new ApiResult<IList<MqttClientStatus>>(ApiCode.Success, "OK", await _serverEx.GetClientsAsync());
}

[Authorize(Roles = nameof(UserRole.NormalUser))]
Expand All @@ -967,7 +963,7 @@ public async Task<ApiResult<IList<IMqttClientStatus>>> GetClientStatus()
[ProducesDefaultResponseType]
public async Task<ApiResult<int>> GetSessionsCount()
{
return new ApiResult<int>(ApiCode.Success, "OK", (await _serverEx.GetClientStatusAsync()).Count);
return new ApiResult<int>(ApiCode.Success, "OK", (await _serverEx.GetClientsAsync()).Count);
}
}
}
2 changes: 0 additions & 2 deletions IoTSharp/Controllers/SubscriptionController.cs
Expand Up @@ -23,8 +23,6 @@ namespace IoTSharp.Controllers
public class SubscriptionEventController : Controller
{
private ApplicationDbContext _context;
private readonly FlowRuleProcessor _flowRuleProcessor;
private readonly TaskExecutorHelper _helper;
private UserManager<IdentityUser> _userManager;

// GET: SubscriptionEventController
Expand Down
6 changes: 2 additions & 4 deletions IoTSharp/Dockerfile
Expand Up @@ -48,6 +48,8 @@ RUN KEYRING=/usr/share/keyrings/nodesource.gpg && curl -fsSL https://deb.nodesou


WORKDIR /src
COPY ["IoTSharp/ClientApp/package.json", "IoTSharp/ClientApp/package.json"]
RUN npm install --prefix ./IoTSharp/ClientApp/
COPY ["IoTSharp/IoTSharp.csproj", "IoTSharp/"]
COPY ["IoTSharp.Data/IoTSharp.Data.csproj", "IoTSharp.Data/"]
COPY ["IoTSharp.Interpreter/IoTSharp.Interpreter.csproj", "IoTSharp.Interpreter/"]
Expand All @@ -59,15 +61,11 @@ COPY ["IoTSharp.Data.Oracle/IoTSharp.Data.Oracle.csproj", "IoTSharp.Data.Oracle/
COPY ["IoTSharp.Data.PostgreSQL/IoTSharp.Data.PostgreSQL.csproj", "IoTSharp.Data.PostgreSQL/"]
COPY ["IoTSharp.Data.MySQL/IoTSharp.Data.MySQL.csproj", "IoTSharp.Data.MySQL/"]
RUN dotnet restore "IoTSharp/IoTSharp.csproj"
COPY ["IoTSharp/ClientApp/package.json", "IoTSharp/ClientApp/package.json"]
RUN npm install --prefix ./IoTSharp/ClientApp/
COPY . .
WORKDIR "/src/IoTSharp"
RUN dotnet build "IoTSharp.csproj" -c Release -o /app/build




FROM build AS publish
RUN dotnet publish "IoTSharp.csproj" -c Release -o /app/publish

Expand Down
45 changes: 30 additions & 15 deletions IoTSharp/Extensions/MqttExtension.cs
Expand Up @@ -8,18 +8,15 @@
using Microsoft.AspNetCore.Builder;
using MQTTnet.AspNetCore;
using MQTTnet.Diagnostics;
using MQTTnet.AspNetCoreEx;
using IoTSharp.Handlers;
using IoTSharp.Services;
using MQTTnet.Server;
using MQTTnet.Client.Receiving;
using MQTTnet.Client.Options;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using MQTTnet.AspNetCore.Extensions;
using MQTTnet.Diagnostics.Logger;
using System.Security.Cryptography.X509Certificates;
using MQTTnet;

namespace IoTSharp
{
Expand All @@ -29,7 +26,7 @@ public static class MqttExtension
public static void AddIoTSharpMqttServer(this IServiceCollection services, MqttBrokerSetting broker)
{
services.AddMqttTcpServerAdapter();
services.AddHostedMqttServerEx(options =>
services.AddHostedMqttServer(options =>
{
options.WithDefaultEndpointPort(broker.Port).WithDefaultEndpoint();
if (broker.EnableTls)
Expand Down Expand Up @@ -59,19 +56,37 @@ public static void AddIoTSharpMqttServer(this IServiceCollection services, MqttB
public static void UseIotSharpMqttServer(this IApplicationBuilder app)
{
var mqttEvents = app.ApplicationServices.CreateScope().ServiceProvider.GetService<MQTTServerHandler>();
IMqttServerStorage storage = app.ApplicationServices.CreateScope().ServiceProvider.GetService<IMqttServerStorage>();
app.UseMqttServerEx(server =>
app.UseMqttServer(server =>
{
server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(args => mqttEvents.Server_ClientConnected(server, args));
server.StartedHandler = new MqttServerStartedHandlerDelegate(args => mqttEvents.Server_Started(server, args));
server.StoppedHandler = new MqttServerStoppedHandlerDelegate(args => mqttEvents.Server_Stopped(server, args));
server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(args => mqttEvents.Server_ApplicationMessageReceived(server, args));
server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedTopicHandlerDelegate( args => mqttEvents.Server_ClientSubscribedTopic(server, args));
server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(args => mqttEvents.Server_ClientUnsubscribedTopic(server, args));
server.ClientConnectionValidatorHandler = new MqttServerClientConnectionValidatorHandlerDelegate(args => mqttEvents.Server_ClientConnectionValidator(server, args));
server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(args => mqttEvents.Server_ClientDisconnected(server, args));
server.ClientConnectedAsync += mqttEvents.Server_ClientConnectedAsync;
server.StartedAsync += mqttEvents.Server_Started ;
server.StoppedAsync += mqttEvents.Server_Stopped ;
server.ApplicationMessageNotConsumedAsync += mqttEvents.Server_ApplicationMessageReceived ;
server .ClientSubscribedTopicAsync += mqttEvents.Server_ClientSubscribedTopic;
server.ClientUnsubscribedTopicAsync += mqttEvents.Server_ClientUnsubscribedTopic;
server.ValidatingConnectionAsync += mqttEvents.Server_ClientConnectionValidator;
server.ClientDisconnectedAsync +=mqttEvents.Server_ClientDisconnected;
});
}
public static async Task PublishAsync<T>(this MqttServer mqtt, string SenderClientId, string topic, T _payload) where T : class
{
await mqtt.PublishAsync(SenderClientId, new MqttApplicationMessage() { Topic = topic, Payload = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(_payload) });
}
public static async Task PublishAsync(this MqttServer mqtt, string SenderClientId, string topic, string _payload)
{
await mqtt.PublishAsync(SenderClientId, new MqttApplicationMessage() { Topic = topic, Payload = System.Text.Encoding.Default.GetBytes(_payload) });
}
public static async Task PublishAsync(this MqttServer mqtt, string SenderClientId, string topic, byte[] _payload)
{
await mqtt.PublishAsync(SenderClientId, new MqttApplicationMessage() { Topic = topic, Payload = _payload });
}

public static async Task PublishAsync ( this MqttServer mqtt, string SenderClientId ,MqttApplicationMessage message)
{
var clients = await mqtt.GetClientsAsync();
var client= clients.FirstOrDefault(c => c.Id == SenderClientId);
await client.Session.EnqueueApplicationMessageAsync(message);
}


public static void AddMqttClient(this IServiceCollection services, MqttClientSetting setting)
Expand Down