Skip to content

Commit

Permalink
Merge branch 'master' into feature/stabilize_connectivity
Browse files Browse the repository at this point in the history
# Conflicts:
#	services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/MessageMappingProcessorActor.java
#	services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/ReconnectActor.java
  • Loading branch information
ffendt committed Sep 6, 2018
2 parents b75c56d + ca34ffa commit c08d26b
Show file tree
Hide file tree
Showing 88 changed files with 4,887 additions and 466 deletions.
11 changes: 10 additions & 1 deletion bom/pom.xml
Expand Up @@ -26,7 +26,9 @@

<properties>
<!-- globally set version for checking binary compatibility against -->
<binary-compatibility-check.version>0.8.0-M1</binary-compatibility-check.version>
<!-- TODO: set to latest release -->
<!--<binary-compatibility-check.version>0.8.0-M1</binary-compatibility-check.version>-->
<binary-compatibility-check.version>${revision}</binary-compatibility-check.version>

<scala.version>2.12</scala.version> <!-- for scala libraries the scala version is used in their artifactId -->
<scala.full.version>2.12.6</scala.full.version>
Expand Down Expand Up @@ -96,6 +98,7 @@

<scalatest.version>3.0.0</scalatest.version>
<flapdoodle.version>2.0.0</flapdoodle.version>
<alpakka-mqtt.version>0.20</alpakka-mqtt.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -305,6 +308,12 @@
<version>${amqp-client.version}</version>
</dependency>

<dependency>
<groupId>com.lightbend.akka</groupId>
<artifactId>akka-stream-alpakka-mqtt_2.12</artifactId>
<version>${alpakka-mqtt.version}</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
Expand Down
Expand Up @@ -194,6 +194,9 @@ entries:
- title: AMQP 1.0 protocol binding
url: /connectivity-protocol-bindings-amqp10.html
output: web
- title: MQTT 3.1.1 protocol binding
url: /connectivity-protocol-bindings-mqtt.html
output: web
- title: Payload mapping
url: /connectivity-mapping.html
output: web
Expand Down
6 changes: 3 additions & 3 deletions documentation/src/main/resources/jsonschema/connection.json
Expand Up @@ -15,7 +15,7 @@
"connectionType": {
"$id": "/properties/connectionType",
"type": "string",
"enum": [ "amqp-091", "amqp-10" ],
"enum": [ "amqp-091", "amqp-10", "mqtt" ],
"title": "Connection type",
"description": "The type determining the connection's underlying transport protocol",
"examples": [
Expand Down Expand Up @@ -77,7 +77,7 @@
"authorizationContext": {
"$id": "/properties/sources/items/properties/authorizationContext",
"type": "array",
"title": "The authorisation context",
"title": "The authorization context",
"description": "The authorization context defines all authorization subjects associated for this source. ",
"uniqueItems": true,
"items": {
Expand Down Expand Up @@ -133,7 +133,7 @@
"authorizationContext": {
"$id": "/properties/targets/items/properties/authorizationContext",
"type": "array",
"title": "The authorisation context",
"title": "The authorization context",
"description": "The authorization context defines all authorization subjects associated for this target. ",
"uniqueItems": true,
"items": {
Expand Down
Expand Up @@ -23,9 +23,10 @@ The components have the following tasks:
updating an optimized search index + executes queries on this search index
* [Concierge](architecture-services-concierge.html): orchestrates and authorizes the backing persistence services
* [Gateway](architecture-services-gateway.html): provides HTTP and WebSocket API
* [Connectivity](architecture-services-connectivity.html): connects to AMQP 1.0 endpoints (e.g. [Eclipse Hono](https://eclipse.org/hono/))
or AMQP 0.9.1 endpoints (e.g. RabbitMQ) and consumes messages in [Ditto Protocol](protocol-overview.html) from it or transforms
messages into Ditto Protocol
* [Connectivity](architecture-services-connectivity.html):
sends [Ditto Protocol](protocol-overview.html) messages to external message brokers and receives messages from them. <br>
Supported transport protocols are AMQP 1.0 (e.g. [Eclipse Hono](https://eclipse.org/hono/)),
AMQP 0.9.1 (e.g. RabbitMQ) or MQTT 3.1.1 (e.g. [Eclipse Mosquitto](https://www.eclipse.org/mosquitto/)).

## Components

Expand Down
22 changes: 13 additions & 9 deletions documentation/src/main/resources/pages/ditto/basic-connections.md
Expand Up @@ -8,8 +8,9 @@ permalink: basic-connections.html
## Connection model

{%
include note.html content="To get started with connections right away, consult the [Manage connections]
(connectivity-manage-connections.html) page. "
include note.html content="To get started with connections right away, consult the
[Manage connections](connectivity-manage-connections.html)
page. "
%}

You can integrate your Ditto instance with external messaging services such as
Expand All @@ -33,15 +34,16 @@ for custom payload formats. Currently the following connection types are support

* [AMQP 0.9.1](connectivity-protocol-bindings-amqp091.html)
* [AMQP 1.0](connectivity-protocol-bindings-amqp10.html)
* [MQTT 3.1.1](connectivity-protocol-bindings-mqtt.html)


The `sources` and `targets` identifier format depends on the `connectionType` and has therefore `connectionType`
specific limitations. Those are documented with the corresponding protocol bindings.

A connection is initiated by the connectivity service. This obviates the need for client authorization, because
Ditto becomes the client in this case. Nevertheless, to access resources within Ditto, the connection must know on
whose behalf it is acting. This is controlled via the configured `authorisationContext`, which holds a list of
self-assigned authorization subjects. Before a connection can access a Ditto resource, one of its
whose behalf it is acting. This is controlled via the configured `authorizationContext`, which holds a list of
self-assigned authorization subjects. Before a connection can access a Ditto resource, one of its
`authorizationSubject`s must be granted the access rights by an authorization mechanism such as
[ACLs](basic-acl.html) or [Policies](basic-policy.html).

Expand All @@ -56,9 +58,9 @@ over how messages are consumed or where they are published to. The general synta

### Placeholder for source authorization subjects
Processing the messages received via a source using the _same fixed authorization subject_ may not be
suitable for every scenario. For example you want to declare fine-grained write permissions per device which would not
be possible with a fixed global subject. For this use case we introduced placeholder substitution for authorization subjects of
source addresses that are resolved when processing messages from a source. Of course this requires the sender of the
suitable for every scenario. For example, if you want to declare fine-grained write permissions per device, this would not
be possible with a fixed global subject. For this use case we have introduced placeholder substitution for authorization subjects of
source addresses that are resolved when processing messages from a source. Of course, this requires the sender of the
message to provide necessary information about the original issuer of the message.

{%
Expand All @@ -68,6 +70,7 @@ message to provide necessary information about the original issuer of the messag
You can access any header value of the incoming message by using a placeholder like `{% raw %}{{ header:name }}{% endraw %}`.

Example:

Assuming the messages received from the source _telemetry_ contain a `device_id` header (e.g. _sensor-123_),
you may configure your source's authorization subject as follows:
```json
Expand All @@ -87,19 +90,20 @@ In case the header cannot be resolved or the header contains unexpected characte
back to the sender as an error message, if a valid _reply-to_ header was provided, otherwise the message is dropped.

### Placeholder for target addresses
Another use case for placeholders may be to publish thing events or live commands and events to a target address
Another use case for placeholders may be to publish Thing events or live commands and events to a target address
containing Thing-specific information e.g. you can distribute Things from different namespaces to different target addresses.
You can use the placeholders `{% raw %}{{ thing:id }}{% endraw %}`, `{% raw %}{{ thing:namespace }}{% endraw %}` and `{% raw %}{{ thing:name }}{% endraw %}` in the target address for this purpose.
For a Thing with the ID _org.eclipse.ditto:device-123_ these placeholders are resolved as follows:

| Placeholder | Description | Resolved value |
|--------|------------|------------|
| `thing:id` | Full ID composed of _namespace_ + _:_ as a separator + _name_ | org.eclipse.ditto:device-123 |
| `thing:id` | Full ID composed of _namespace_ `:` (as a separator), and _name_ | org.eclipse.ditto:device-123 |
| `thing:namespace` | Namespace (i.e. first part of an ID) | org.eclipse.ditto |
| `thing:name` | Name (i.e. second part of an ID ) | device-123 |


Example:

Sending live commands and events to a target address that contains the Things' namespace.
```json
{
Expand Down
5 changes: 5 additions & 0 deletions documentation/src/main/resources/pages/ditto/basic-signals.md
Expand Up @@ -50,3 +50,8 @@ other parts of the system.
3. In addition an **event** is both persisted into the datastore and published.<br/>
The event describes that the change was applied to an entity (e.g. a `Thing`).<br/>
Interested parties can subscribe for such **events** and follow the evolving entity.

{% include note.html
content="Events caused by commands from a **[connection](basic-connections.html)** are not published
**to the same origin**. The connection can receive a response, but will not additionally get an event."
%}
Expand Up @@ -41,8 +41,9 @@ Create a new connection by sending the following DevOps command:
```

The content of the connection configuration object is specified in the [Connections section](basic-connections.html).
For protocol specific examples consolidate the [AMQP-0.9.1 binding](connectivity-protocol-bindings-amqp091.html) and
the [AMQP-1.0 binding](connectivity-protocol-bindings-amqp10.html) respectively.
For protocol specific examples, consult the [AMQP-0.9.1 binding](connectivity-protocol-bindings-amqp091.html),
[AMQP-1.0 binding](connectivity-protocol-bindings-amqp10.html) or
[MQTT-3.1.1 binding](connectivity-protocol-bindings-mqtt.html) respectively.

### Modify connection

Expand All @@ -59,12 +60,12 @@ Modify an existing connection by sending the following DevOps command:
}
```

The connection with the specified id has to be created before being able to modify it.
The connection with the specified ID needs to be created before one can modify it.


### Retrieve connection

The only parameter necessary for connection retrieval is the `connectionId`:
The only parameter necessary for retrieving a connection is the `connectionId`:

```json
{
Expand Down Expand Up @@ -109,7 +110,7 @@ The only parameter necessary for closing a connection is the `connectionId`:

### Delete connection

The only parameter necessary for connection deletion is the `connectionId`:
The only parameter necessary for deleting a connection is the `connectionId`:

```json
{
Expand Down
Expand Up @@ -20,3 +20,4 @@ The following connection types are supported:

* [AMQP 0.9.1](connectivity-protocol-bindings-amqp091.html)
* [AMQP 1.0](connectivity-protocol-bindings-amqp10.html)
* [MQTT 3.1.1](connectivity-protocol-bindings-mqtt.html)
Expand Up @@ -45,12 +45,14 @@ incoming messages are processed. These subjects may contain placeholders, see
### Target format

An AMQP 0.9.1 connection requires the protocol configuration target object to have an `address` property with a combined
value of the `exchange_name` and `routing_key`. The target address may contain placeholders, see
[placeholders](basic-connections.html#placeholder-for-target-addresses) section for more information.
It is continued with a list of topic strings, each representing a
subscription of a Ditto [protocol topic](protocol-specification-topic.html) and an array of authorization subjects.
Outbound messages are published to the configured target address if one of these subjects have READ permission
on the Thing that is associated with a message.
value of the `exchange_name` and `routing_key`. The target address may contain placeholders; see
[placeholders](basic-connections.html#placeholder-for-target-addresses) section for more information.

Further, `"topics"` is a list of strings, each list entry representing a subscription of
[Ditto protocol topics](protocol-specification-topic.html).

Outbound messages are published to the configured target address if one of the subjects in `"authorizationContext"`
have READ permission on the Thing that is associated with a message.


```json
Expand Down
Expand Up @@ -45,12 +45,15 @@ inbound messages are processed. These subjects may contain placeholders, see
### Target format

An AMQP 1.0 connection requires the protocol configuration target object to have an `address` property with a source
identifier. The target address may contain placeholders, see
identifier. The target address may contain placeholders; see
[placeholders](basic-connections.html#placeholder-for-target-addresses) section for more
information. It is continued with a list of topic strings, each representing a subscription of a Ditto
[protocol topic](protocol-specification-topic.html) and an array of authorization subjects. Outbound messages
are published to the configured target address if one of these subjects have READ permission on the Thing that is
associated with a message.
information.

Further, `"topics"` is a list of strings, each list entry representing a subscription of
[Ditto protocol topics](protocol-specification-topic.html).

Outbound messages are published to the configured target address if one of the subjects in `"authorizationContext"`
have READ permission on the Thing that is associated with a message.

```json
{
Expand Down
@@ -0,0 +1,142 @@
---
title: MQTT 3.1.1 protocol binding
keywords: binding, protocol, mqtt
tags: [protocol, connectivity]
permalink: connectivity-protocol-bindings-mqtt.html
---

When MQTT messages are sent in [Ditto Protocol](protocol-overview.html),
the payload should be `UTF-8` encoded strings.

If messages which are not in Ditto Protocol should be processed, a [payload mapping](connectivity-mapping.html) must
be configured for the connection in order to transform the messages.

## MQTT 3.1.1 properties

MQTT 3.1.1 messages have no application headers. Transmission-relevant properties are set in the
`"headers"` field as a part of [Ditto protocol messages](protocol-specification.html#dittoProtocolEnvelope) in the
payload.

These properties are supported:

* `correlation-id`: For correlating request messages and events. Twin events have the correlation IDs of
[Twin commands](protocol-twinlive.html#twin) that produced them.
* `reply-to`: The value should be an MQTT topic.
If a command sets the header `reply-to`, then its response is published at the topic equal to the header value.

## Specific connection configuration

### Source format

For an MQTT connection:

* Source `"addresses"` are MQTT topics to subscribe to. Wildcards `+` and `#` are allowed.
* `"authorizationContext"` may _not_ contain placeholders `{%raw%}{{ header:<header-name> }}{%endraw%}` as MQTT 3.1.1
has no application headers.
* The additional field `"filters"` defines filters of MQTT messages by checking their topics against their payload.
If at least one filter is defined, then messages are dropped if their topics do not match any of the filters.
Filters can be specified using placeholders `{%raw%}{{ thing:id }}{%endraw%}`,
`{%raw%}{{ thing:namespace }}{%endraw%}` or `{%raw%}{{ thing:name }}{%endraw%}`.
* The additional field `"qos"` sets the maximum Quality of Service to request when subscribing for messages. Its value
can be `0` for at-most-once delivery, `1` for at-least-once delivery and `2` for exactly-once delivery.
Support of any Quality of Service depends on the external MQTT broker.
The default value is `0` (at-most-once).


```json
{
"addresses": [
"<mqtt_topic>",
"..."
],
"authorizationContext": ["ditto:inbound-auth-subject", "..."],
"filters": [
"{%raw%}telemetry/{{ thing:id }}{%endraw%}",
"{%raw%}device/{{ thing:namespace }}/{{ thing:name }}{%endraw%}",
"..."
],
"qos": 0
}
```

### Target format

For an MQTT connection, the target address is the MQTT topic to publish events and messages to.
The target address may contain placeholders; see
[placeholders](basic-connections.html#placeholder-for-target-addresses) section for more information.

Further, `"topics"` is a list of strings, each list entry representing a subscription of
[Ditto protocol topics](protocol-specification-topic.html).

Outbound messages are published to the configured target address if one of the subjects in `"authorizationContext"`
have READ permission on the Thing that is associated with a message.

The additional field `"qos"` sets the Quality of Service with which messages are published.
Its value can be `0` for at-most-once delivery, `1` for at-least-once delivery and `2` for exactly-once delivery.
Support of any Quality of Service depends on the external MQTT broker.
The default value is `0` (at-most-once).


```json
{
"address": "mqtt/topic/of/my/device/{%raw%}{{ thing:id }}{%endraw%}",
"topics": [
"_/_/things/twin/events",
"_/_/things/live/messages"
],
"authorizationContext": ["ditto:outbound-auth-subject", "..."],
"qos": 0
}
```


## Establishing a connection to an MQTT 3.1.1 endpoint

Ditto's [Connectivity service](architecture-services-connectivity.html) is responsible for creating new and managing
existing connections.

This can be done dynamically at runtime without the need to restart any microservice using a
[Ditto DevOps command](installation-operating.html#devops-commands).

Example

Connection configuration to create a new MQTT connection:

```json
{
"id": "mqtt-example-connection-123",
"connectionType": "mqtt",
"connectionStatus": "open",
"failoverEnabled": true,
"uri": "tcp://test.mosquitto.org:1883",
"sources": [
{
"addresses": [
"eclipse-ditto-sandbox/#"
],
"authorizationContext": ["ditto:inbound-auth-subject"],
"qos": 0,
"filters": []
}
],
"targets": [
{
"address": "eclipse-ditto-sandbox/{%raw%}{{ thing:id }}{%endraw%}",
"topics": [
"_/_/things/twin/events"
],
"authorizationContext": ["ditto:outbound-auth-subject"],
"qos": 0
}
]
}
```

## Messages

Messages consumed via the MQTT binding are treated similar to the
[WebSocket binding](httpapi-protocol-bindings-websocket.html),
meaning that the messages are expected to be [Ditto Protocol](protocol-overview.html) messages serialized as
UTF-8-coded JSON (as shown for example in the [protocol examples](protocol-examples.html)).
If your payload does not conform to the [Ditto Protocol](protocol-overview.html) or uses any character set other
than UTF-8, you can configure a custom [payload mapping](connectivity-mapping.html).

0 comments on commit c08d26b

Please sign in to comment.