Skip to content
Permalink
Browse files

progress on the java mqtt component

  • Loading branch information...
ffleurey committed Nov 22, 2018
1 parent ebed0ae commit 8afc24ce8389fa1bd8d65b7d81eedf502e246c81
@@ -13,6 +13,7 @@ xtend-gen/
**/xtend-gen/
language/thingml.web/WebRoot/xtext-resources/generated/mode-thingml.js
language/thingml/model/
doc/examples/thingml-gen/
tmp/
*/tmp/
**/tmp/
@@ -6,7 +6,7 @@ object Buffer
@c_type "uint8_t *"
@c_byte_size "*"

thing MQTTAdapterMsgs {
thing fragment MQTTAdapterMsgs {

message mqtt_set_credentials(usr : String, pwd : String)

@@ -19,17 +19,51 @@ thing MQTTAdapterMsgs {
message mqtt_disconnected()

message mqtt_publish(topic : String, payload: Buffer, size : UInt32)
message mqtt_publish_with_qos(topic : String, payload: Buffer, size : UInt32, qos : UInt8, retain : Boolean)


message mqtt_subscribe(topic : String)
message mqtt_set_prefix(prefix : String)

message mqtt_message(topic : String, payload: Buffer, size : UInt32)

message mqtt_error()
message mqtt_message_published()
}

thing MQTTAdapterCtrlPort includes MQTTAdapterMsgs {
thing fragment MQTTAdapterCtrlPort includes MQTTAdapterMsgs {

required port mqtt
@sync_send "true" // Some of the messages pass pointers (Strings) which should not be put on the FIFO
{
sends mqtt_connect, mqtt_disconnect, mqtt_set_prefix
receives mqtt_connected, mqtt_disconnected
sends mqtt_connect, mqtt_disconnect, mqtt_set_credentials, mqtt_set_tls_certificates, mqtt_set_prefix
receives mqtt_connected, mqtt_disconnected, mqtt_error, mqtt_message_published
}
}

thing fragment AbstractMQTTAdapter includes MQTTAdapterMsgs {

provided port mqtt
@sync_send "true" // Some of the messages pass pointers (Strings) which should not be put on the FIFO
{
receives mqtt_connect, mqtt_disconnect, mqtt_publish, mqtt_publish_with_qos, mqtt_subscribe, mqtt_set_credentials, mqtt_set_prefix, mqtt_set_tls_certificates
sends mqtt_connected, mqtt_disconnected, mqtt_message, mqtt_error, mqtt_message_published
}

property broker_host : String
property broker_port : UInt16

property client_id : String

property tls : Boolean = false
property username : String
property password : String

property cafile : String
property capath : String
property certfile : String
property keyfile : String

property topic_prefix : String

}
@@ -1,16 +1,16 @@
import "../MQTTAdapterMsgs.thingml"

object MQTTMessage
@java_type "org.eclipse.paho.client.mqttv3.MqttMessage"
@java_type `org.eclipse.paho.client.mqttv3.MqttMessage`

object Throwable
@java_type "Throwable"
@java_type `Throwable`

object IMqttDeliveryToken
@java_type "IMqttDeliveryToken"
@java_type `IMqttDeliveryToken`

thing JavaMQTTAdapter includes MQTTAdapterMsgs
@maven_dep "
thing JavaMQTTAdapter includes AbstractMQTTAdapter
@maven_dep `
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
@@ -21,32 +21,19 @@ thing JavaMQTTAdapter includes MQTTAdapterMsgs
<groupId>org.json</groupId>
<artifactId>org.json</artifactId>
<version>chargebee-1.0</version>
</dependency>"
</dependency>`

@java_interface "org.eclipse.paho.client.mqttv3.MqttCallbackExtended"
@java_import "import java.util.logging.Logger;"
@java_import "import java.util.logging.Level;"
@java_import "import org.eclipse.paho.client.mqttv3.*;"
@java_import "import org.eclipse.paho.client.mqttv3.persist.*;"
@java_features "org.eclipse.paho.client.mqttv3.MqttClient clientPub, clientSub;"
@java_interface `org.eclipse.paho.client.mqttv3.MqttCallbackExtended`
@java_import `import java.util.logging.Logger;`
@java_import `import java.util.logging.Level;`
@java_import `import org.eclipse.paho.client.mqttv3.*;`
@java_import `import org.eclipse.paho.client.mqttv3.persist.*;`
@java_features `org.eclipse.paho.client.mqttv3.MqttAsyncClient mqttAsyncClient;`
{
provided port mqtt
@sync_send "true" // Some of the messages pass pointers (Strings) which should not be put on the FIFO
{
receives mqtt_connect, mqtt_disconnect, mqtt_publish, mqtt_subscribe, mqtt_set_credentials, mqtt_set_prefix
sends mqtt_connected, mqtt_disconnected, mqtt_message
}


property broker_uri : String = "tcp://localhost:1883"


property client_id : String = `null`

property username : String = `null`
property password : String = `null`

property topic_prefix : String = `null`

function setTopicPrefix(src : String) do
topic_prefix = src
end
@@ -58,6 +45,7 @@ thing JavaMQTTAdapter includes MQTTAdapterMsgs
`MqttConnectOptions connOpts = new MqttConnectOptions();`
`connOpts.setCleanSession(true);`
`connOpts.setAutomaticReconnect(true);`
`connOpts.setAutomaticReconnect(true);`

if (username != `null` and password != `null`) do
`connOpts.setUserName(`&username&`);`
@@ -68,14 +56,26 @@ thing JavaMQTTAdapter includes MQTTAdapterMsgs
client_id = `MqttClient.generateClientId()`
end

`clientSub = new MqttClient(`&broker_uri&`, `&client_id&` + "_Sub", persistence);`
`clientPub = new MqttClient(`&broker_uri&`, `&client_id&` + "_Pub", persistence);`
`mqttAsyncClient = new MqttAsyncClient(`&broker_uri&`, `&client_id&` + "_Sub", persistence);`

// For now we are only monitoring the connection on the subscriber client
`clientSub. setCallback(this);`
`mqttAsyncClient.setCallback(this);`

`clientSub.connect(connOpts);`
`clientPub.connect(connOpts);`

// Connect and collect associated callbacks
`IMqttToken mqttConnectToken = mqttAsyncClient.connect(connOpts, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
`mqtt!mqtt_connected()`
}

@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
`mqtt!mqtt_error()`
`mqtt!mqtt_disconnected()`
exception.printStackTrace();
}
});`

`} catch (org.eclipse.paho.client.mqttv3.MqttException ex) {`
`Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, null, ex);`
@@ -85,12 +85,27 @@ thing JavaMQTTAdapter includes MQTTAdapterMsgs
end

function mqtt_publish(topic : String, payload: Buffer, size : UInt16) : Boolean do
if (`clientPub == null`) return false
if (`mqttAsyncClient == null`) return false
`try {
Logger.getLogger(this.getClass().getName()).log(Level.INFO, "MQTT Publish: " + `&topic&` + " -> " + new String(`&payload&`));
//Logger.getLogger(this.getClass().getName()).log(Level.INFO, "MQTT Publish: " + `&topic&` + " -> " + new String(`&payload&`));

MqttMessage message = new MqttMessage(`&payload&`);
if (`&topic_prefix&` != null) clientPub.publish(`&topic_prefix&` + `&topic&`, message);
else clientPub.publish(`&topic&`, message);
String fulltopic = `&topic&`;
if (`&topic_prefix&` != null) fulltopic = `&topic_prefix&` + fulltopic;

mqttAsyncClient.publish(fulltopic, message, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
`mqtt!mqtt_message_published()`
}

@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
`mqtt!mqtt_error()`
exception.printStackTrace();
}
});

} catch (org.eclipse.paho.client.mqttv3.MqttException ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, null, ex);`
return false
@@ -100,43 +115,46 @@ thing JavaMQTTAdapter includes MQTTAdapterMsgs


function mqtt_subscribe(topic : String) : Boolean do
if (`clientSub == null`) return false
if (`mqttAsyncClient == null`) return false
`try {
Logger.getLogger(this.getClass().getName()).log(Level.INFO, "MQTT Subscribe: " + `&topic&`);
if (`&topic_prefix&` != null) clientSub.subscribe(`&topic_prefix&` + `&topic&`);
else clientSub.subscribe(`&topic&`);
if (`&topic_prefix&` != null) mqttAsyncClient.subscribe(`&topic_prefix&` + `&topic&`, 0);
else mqttAsyncClient.subscribe(`&topic&`, 0);
} catch (org.eclipse.paho.client.mqttv3.MqttException ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, null, ex);`
return false
`}`
return true
end

function messageArrived(topic : String, m : MQTTMessage) @override "true" @java_visibility "public"
function messageArrived(topic : String, m : MQTTMessage)
@override "true"
@java_visibility "public"
do
print "Message arrived on topic "
print topic
print "\n"
mqtt!mqtt_message(topic, ``&m&`.getPayload()`, ``&m&`.getPayload().length`)
end

function connectionLost(t : Throwable) @override "true" @java_visibility "public"
function connectionLost(t : Throwable)
@override "true"
@java_visibility "public"
do
print "connectionLost \n"
mqtt!mqtt_disconnected()
end

function deliveryComplete(imdt : IMqttDeliveryToken) @override "true" @java_visibility "public"
function deliveryComplete(imdt : IMqttDeliveryToken)
@override "true"
@java_visibility "public"
do
print "deliveryComplete \n"
// Handled in IMqttActionListener
end

function connectComplete(reconnect : Boolean, serverURI : String) @override "true" @java_visibility "public"
function connectComplete(reconnect : Boolean, serverURI : String)
@override "true"
@java_visibility "public"
do
print "connectComplete \n"
mqtt!mqtt_connected()
// Handled in IMqttActionListener
end


statechart MQTTAdapterThing init Start {

internal event m: mqtt?mqtt_set_credentials action do
@@ -38,6 +38,7 @@ thing MQTTAdapterTest includes MQTTAdapterMsgs, TimerMsgs {

state DISCONNECTED {
transition -> CONNECTED event mqtt?mqtt_connected action print "Client got mqtt_connected\n"
transition -> DISCONNECTED event mqtt?mqtt_disconnected action print "Client got mqtt_disconnected\n"
}

state CONNECTED {
@@ -55,7 +56,7 @@ thing MQTTAdapterTest includes MQTTAdapterMsgs, TimerMsgs {
end

internal event m:mqtt?mqtt_message action do
`System.out.println("RCV topic:"+`&m.topic&`+" payload:" + `&m.payload&`);`
`System.out.println("RCV topic:"+`&m.topic&`+" payload:" + new String(`&m.payload&`, java.nio.charset.StandardCharsets.UTF_8));`
end

transition -> DISCONNECTED event mqtt?mqtt_disconnected action print "Client got mqtt_disconnected\n"
@@ -5,7 +5,7 @@ object Mosquitto
@c_byte_size "*"


thing PosixMQTTAdapter includes MQTTAdapterMsgs
thing PosixMQTTAdapter includes AbstractMQTTAdapter
@c_header "
#include <errno.h>
#include <stdio.h>
@@ -16,37 +16,19 @@ thing PosixMQTTAdapter includes MQTTAdapterMsgs
#include <math.h>
#include <time.h>
#include <mosquitto.h>"

@c_global ""
{
provided port mqtt
@sync_send "true" // Some of the messages pass pointers (Strings) which should not be put on the FIFO
{
receives mqtt_connect, mqtt_disconnect, mqtt_publish, mqtt_subscribe, mqtt_set_credentials, mqtt_set_prefix, mqtt_set_tls_certificates
sends mqtt_connected, mqtt_disconnected, mqtt_message
}

property client : Mosquitto = `NULL`

property broker_host : String = `NULL`
property broker_port : UInt16 = 1883

property client_id : String = `NULL`

property tls : Boolean = false
property username : String = `NULL`
property password : String = `NULL`

property cafile : String = `NULL`
property capath : String = `NULL`
property certfile : String = `NULL`
property keyfile : String = `NULL`


property topic_prefix : String = `NULL`

//readonly property will_topic : String = `NULL`
//readonly property will_string : String = `NULL`
function init_properties() do
username = `NULL`
password = `NULL`
cafile = `NULL`
capath = `NULL`
certfile = `NULL`
keyfile = `NULL`
end

function setTopicPrefix(src : String) do
`if (`&topic_prefix&` != NULL) free(`&topic_prefix&`);
@@ -241,12 +223,12 @@ thing PosixMQTTAdapter includes MQTTAdapterMsgs
return true
end

function mqtt_publish(topic : String, payload: Buffer, size : UInt16) do
function mqtt_publish(topic : String, payload: Buffer, size : UInt16, qos: UInt8, retain : Boolean) do
`char top[256];`
if (topic_prefix != `NULL`) `snprintf(top, 256, "%s/%s", `&topic_prefix&`, `&topic&`);`
else `snprintf(top, 256, "%s", `&topic&`);`

var status : Integer = `mosquitto_publish(`&client&`, NULL, top, `&size&`, `&payload&`, 1, 0)`
var status : Integer = `mosquitto_publish(`&client&`, NULL, top, `&size&`, `&payload&`, `&qos&`, `&retain&`)`
if (status != `MOSQ_ERR_SUCCESS`) do
error "[MQTT] Error publishing message (mosquitto_publish returned " error status error ")\n"
end
@@ -259,14 +241,16 @@ thing PosixMQTTAdapter includes MQTTAdapterMsgs
if (topic_prefix != `NULL`) `snprintf(top, 256, "%s/%s", `&topic_prefix&`, `&topic&`);`
else `snprintf(top, 256, "%s", `&topic&`);`

var status : Integer = `mosquitto_subscribe(`&client&`, NULL, top, 1)`
var status : Integer = `mosquitto_subscribe(`&client&`, NULL, top, 0)`
if (status != `MOSQ_ERR_SUCCESS`) do
error "[MQTT] Error subscribing to topic (mosquitto_subscribe returned " error status error ")\n"
end
end

statechart MQTTAdapterThing init Start {

on entry init_properties()

internal event m: mqtt?mqtt_set_credentials action do
username = m.usr
password = m.pwd
@@ -292,7 +276,11 @@ thing PosixMQTTAdapter includes MQTTAdapterMsgs
end

internal event m : mqtt?mqtt_publish action do
mqtt_publish(m.topic, m.payload, m.size)
mqtt_publish(m.topic, m.payload, m.size, 0, false)
end

internal event m : mqtt?mqtt_publish_with_qos action do
mqtt_publish(m.topic, m.payload, m.size, m.qos, m.retain)
end

internal event m : mqtt?mqtt_subscribe action do

0 comments on commit 8afc24c

Please sign in to comment.
You can’t perform that action at this time.