Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 34 additions & 43 deletions ClientSampleBackgroundService/Worker.cs
Original file line number Diff line number Diff line change
@@ -1,60 +1,47 @@
using Coreflux.API.Networking.MQTT;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Coreflux.API.Networking.MQTT;

namespace ClientSampleBackgroundService
{
public class Worker : BackgroundService
{
private bool AlreadyConnectedOneTime;
private readonly ILogger<Worker> _logger;
private MQTTControllerInstance MQTTControllerInstance;
private MQTTControllerInstance mqttInstance;
public bool isConnected;
public int Teste;
public Coreflux.API.Client API = new Coreflux.API.Client("localhost", Coreflux.API.Client.Version.LegacyHTTPS);
public Worker(ILogger<Worker> logger)
{
_logger = logger;
MQTTControllerInstance = new MQTTControllerInstance();

MQTTControllerInstance.OnConnect += MQTTController_OnConnect;
MQTTControllerInstance.OnDisconnect += MQTTController_OnDisconnect;
MQTTControllerInstance.NewPayload += MQTTController_NewPayload;
MQTTControllerInstance.PersistentConnection = true;
mqttInstance = new MQTTControllerInstance();
mqttInstance.OnConnect += MQTTController_OnConnect;
mqttInstance.OnDisconnect += MQTTController_OnDisconnect;
mqttInstance.NewPayload += MQTTController_NewPayload;
mqttInstance.PersistentConnection = true;
AlreadyConnectedOneTime = false;
isConnected = false;
}


private void MQTTController_NewPayload(MQTTNewPayload obj)
{
_logger.LogInformation("received" + obj.topic + " , " + obj.payload + " @ {time} ", DateTimeOffset.Now);
_logger.LogInformation(string.Format("Message received on topic {0} , paylod: {1} @ Time: {2}"), obj.topic, obj.payload, DateTimeOffset.Now);
}

private void MQTTController_OnDisconnect()
{

// MQTTController.StartAsync("127.0.0.1", timeOut: 5, keepAlive: 1).Wait();
_logger.LogInformation("Disconnected of broker {time}", DateTimeOffset.Now);
_logger.LogInformation(string.Format("Disconnected of broker {0}", DateTimeOffset.Now));
isConnected = false;
// ReConnect();

}

private void MQTTController_OnConnect()
{
_logger.LogInformation("Connected to broker {time}", DateTimeOffset.Now);
if (!AlreadyConnectedOneTime)
{

_logger.LogInformation(string.Format("Connected to broker {0}", DateTimeOffset.Now));
if (!AlreadyConnectedOneTime)
_logger.LogInformation("Get inside MQTT_Controller");

}
AlreadyConnectedOneTime = true;
isConnected = true;
}
Expand All @@ -63,56 +50,60 @@ private async void ReConnect()
{
try
{
await MQTTControllerInstance.StartAsync("127.0.0.1", timeOut: 5, keepAlive: 1);
await mqttInstance.StartAsync("127.0.0.1", timeOut: 5, keepAlive: 1);
}
catch
catch (Exception ex)
{
_logger.LogInformation("Failed to find the broker {time}", DateTimeOffset.Now);
_logger.LogInformation(string.Format("Failed to find the broker {0}. Exception: {1}", DateTimeOffset.Now, ex.ToString()));
}
}


protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
MQTTControllerInstance.ClientName = "Test";
await MQTTControllerInstance.StartAsync("127.0.0.1", port: 1883, timeOut: 5, keepAlive: 5, mqttSecure: false);
}
catch
{
_logger.LogInformation("Failed to find the broker {time}", DateTimeOffset.Now);
}
await ConnectToBroker();

while (!stoppingToken.IsCancellationRequested)
{
if (isConnected)
{
//Get the last value of the topic received
var t = MQTTControllerInstance.GetDataAsync("CF/GetTest").GetAwaiter();
var t = mqttInstance.GetDataAsync("CF/GetTest").GetAwaiter();
_logger.LogInformation("Was able to subscribe CF/GetTest with {string} ", t.GetResult());
Task.Delay(100).Wait();
//Set the new value of topic CF/Test
var t1 = MQTTControllerInstance.SetDataAsync("CF/Test", "test", qoslevel: 1);
var t1 = mqttInstance.SetDataAsync("CF/Test", "test", qoslevel: 1);
t1.Wait(100);
var result = t1.GetAwaiter().GetResult();
if (result != null)
{
if (result.ReasonFeedback == MQTTPublishFeedback.FeedbackType.PublishSucess)
{
_logger.LogInformation("Was able to publish CF/Test by publish sucess @ {time} ", DateTimeOffset.Now);
_logger.LogInformation(string.Format("Message publish to CF/Test with success @ {0} ", DateTimeOffset.Now));
}
else
{
_logger.LogInformation("Failed to publish CF/Test by publish failed @ {time} ", DateTimeOffset.Now);
_logger.LogInformation(string.Format("Failed to publish message to CF/Test @ {0} ", DateTimeOffset.Now));
}
}
}
else
{
_logger.LogInformation("Failed to publish CF/Test by null @ {time} ", DateTimeOffset.Now);
_logger.LogInformation(string.Format("Failed to publish message to CF/Test. Isn't connected. @ {0} ", DateTimeOffset.Now));
}


}
}

private async Task ConnectToBroker()
{
try
{
mqttInstance.ClientName = "Test";
await mqttInstance.StartAsync("127.0.0.1", port: 1883, timeOut: 5, keepAlive: 5, mqttSecure: false);
}
catch (Exception ex)
{
_logger.LogInformation(string.Format("Failed to find the broker {0}. Exception {1}", DateTimeOffset.Now, ex.ToString()));
}
}
}
Expand Down