Skip to content
This repository has been archived by the owner on Sep 13, 2023. It is now read-only.

Latest commit



554 lines (433 loc) · 18.5 KB


File metadata and controls

554 lines (433 loc) · 18.5 KB

Internal communication

This page describes how each service in dojot communicate with each other.


Current dojot components are shown in dojot_components.

[Auth] [DeviceManager] [Persister] [History] [DataBroker] [FlowBroker]

package "Databases" {

[mongodb] [postgreSQL]

} package "IoT agents" { [IoT MQTT] [IoT LoRa] [IoT sigfox] [IoT RabbitMQ] }

[postgreSQL] <-- [Auth] [postgreSQL] <-- [DeviceManager] [postgreSQL] <- [Kong] [mongodb] <- [Persister] [mongodb] <-- [FlowBroker] [mongodb] <-- [History]

They are:

  • Auth: authentication mechanism
  • DeviceManager: device and template storage.
  • Persister: component that stores all device-generated data.
  • History: component that exposes all device-generated data.
  • DataBroker: deals with subjects and Kafka topics, as well as connections.
  • Flowbroker: handles flows (both CRUD and flow execution)
  • IoT agents: agents for different protocols.

Each service will be briefly described in this page. More information can be found in each component documentation.

Messaging and authentication

There are two methods through which dojot components can talk to each other: via HTTP REST requests and via Kafka. They are intended for different purposes, though.

HTTP requests can be sent at boot time when a component want, for instance, information about particular resources, such as list of devices or tenants. For that, they must know which component has which resource in order to retrieve them correctly. This means - and this is a very important thing that drives architectural choices in dojot - that only a single service is responsible for retrieving data models for a particular resource (note that a service might have multiple instances, though). For example, DeviceManager is responsible for storing and retrieving information model for devices and templates, FlowBroker for flow descriptions, History for historical data, and so on.

Kafka, in the other hand, allows loosely coupled communication between instances of services. This means that a producer (whoever sends a message) does not know which components will receive its message. Furthermore, any consumer doesn't know who generated the message that it being ingested. This allows data to be transmitted based on "interests": a consumer is interested in ingesting messages with a particular subject (more on that later) and producers will send messages to all components that are interested in it. Note that this mechanism allows multiple services to emit messages with the same "subject", as well as multiple services ingesting messages with the same "subject" with no tricky workarounds whatsoever.

Sending HTTP requests

In order to send requests via HTTP, a service must create an access token, described here. There is no further considerations beyond following the API description associated to each service. This can be seen in figure initial_authentication. Note that all interactions depicted here are abstractions of the actual ones. Also, it should be noted that these interactions are valid only for internal components. Any external service should use Kong as entrypoint.

This is a test

actor Client boundary Kong control Auth

Client -> Kong: POST /auth nBody={"admin", "p4ssw0rD"} activate Kong Kong -> Auth: POST /user nBody={"admin", "p4ssw0rD"} Auth --> Kong: JWT="873927dab" Kong --> Client: JWT="873927dab" deactivate Kong

In this figure, a client retrieves an access token for user admin whose password is p4ssw0rd. After that, a user can send a request to HTTP APIs using it. This is shown in sending_requests. Note: the actual authorization mechanism is detailed in Auth + API gateway (Kong).

actor Client boundary Kong control Auth control DeviceManager database PostgreSQL

Client -> Kong: POST /device nHeaders="Authorization: Bearer JWT"nBody={ device } activate Kong Kong -> Auth: POST /pep nBody={"admin", "/device"} Auth --> Kong: OK 200 Kong -> DeviceManager: POST /device nHeaders="Authorization: JWT" nBody={ "device" : "XYZ" } activate DeviceManager DeviceManager -> PostgreSQL: INSERT INTO .... PostgreSQL --> DeviceManager: OK DeviceManager --> Kong: OK 200 deactivate DeviceManager Kong --> Client: OK 200 deactivate Kong

In this figure, a client creates a new device using the token retrieved in initial_authentication. This request is analyzed by Kong, which will invoke Auth to check whether the user set in the token is allowed to POST to /device endpoint. Only after the approval of such request, Kong will forward it to DeviceManager.

Sending Kafka messages

Kafka uses a quite different approach. Each message should be associated to a subject and a tenant. This is show in retrieving_topics;

control DeviceManager control DataBroker database Redis control Kafka

DeviceManager -> DataBroker: GET /topic/dojot.device-manager.devices nHeaders="Authorization: Bearer JWT" note left JWT contains the service associated to the subject (admin, for instance). end note activate DataBroker DataBroker -> Redis: GET KEYn"admin:dojot.device-manager.devices " note left If the key does not exist, then it will be created. end note Redis --> DataBroker: 9d0352b7-d195-4852... DataBroker -> Redis: GET KEYn"profile-admin:dojot.device-manager.devices " Redis --> DataBroker: { "topic-profile": { ... } } DataBroker -> Kafka: CREATE TOPIC n9d0352b7-d195-4852...n{ "topic-profile": { ... } } note left There's no need to recreate this topic if it is already created. end note Kafka -> DataBroker: OK DataBroker --> DeviceManager: { "topic" : "9d0352b7-d195-4852..." } deactivate DataBroker DeviceManager -> Kafka: SEND MESSAGEn topic:9d0352b7-d195-4852...ndata: {"device": "XYZ", "event": "CREATE", ...} Kafka --> DeviceManager: OK

In this example, DeviceManager needs to publish a message about a new device. In order to do so, it sends a request to DataBroker, indicating which tenant (within JWT token) and which subject (dojot.device-manager.devices) it wants to use to send the message. DataBroker will invoke Redis to check whether this topic is already created and check whether dojot administrator had created a profile to this particular tuple {tenant, subject}.

The two profile schemes available are shown in automatic_scheme and assigned_scheme.

class IAutoScheme <<interface>> {
  • num_partitions: number;
  • replication_factor: number;


The automatic scheme set the number of Kafka partitions to be used to the topic being created, as well as the replication factor (how many replicas will be there for each topic partition). It's up to Kafka to decide which partition and replica will be assigned to which broker instance. You can check Kafka partitions and replicas in order to know a bit more about partition and replicas. Of course you can check Kafka's official documentation.

class IAssignedScheme <<interface>> {
  • replica_assignment: Map<number, number[]>;


The assigned scheme indicates which partition will be allocated to which Kafka instance. This includes also replicas (partitions with more than one associated Kafka instance).

Bootstrapping tenants

All components are interested in a set of subjects, which will be used to either send messages or receive messages from Kafka. As dojot groups Kafka topics and tenants into subjects (a subject will be composed by one or more Kafka topics, each one transmitting messages for a particular tenant), the component must bootstrap each tenant before sending or receiving messages. This is done in two phases: component boot time and component runtime.

In the first phase, a component asks Auth in order to retrieve all currently configured tenants. It is interested, let's say, in consuming messages from device-data and dojot.device-manager.devices subjects. Therefore, it will request DataBroker a topic for each tenant for each subject. With that list of topics, it can create Producers and Consumers to send and receives messages through those topics. This is shown by Tenant bootstrapping startup.

control Component control Auth control DataBroker control Kafka

Component -> DataBroker: GET /topic/dojot.tenancy nHeaders="Authorization: JWT" DataBroker --> Component: {"topic" : "eca098e7f..."} Component-> Auth: GET /tenants Auth --> Component: {"tenants" : ["admin", "tenant1"]} loop each tenant Component -> DataBroker: GET /topic/device-data nHeaders="Authorization: JWT[tenant]" DataBroker --> Component: {"topic" : "890874987ef..."} Component -> Kafka: SUBSCRIBEntopic: 890874987ef... Kafka --> Component: OK Component -> DataBroker: GET /topic/dojot.device-manager.devices nHeaders="Authorization: JWT[tenant]" DataBroker --> Component: {"topic" : "890874987ef..."} Component -> Kafka: SUBSCRIBEntopic: 890874987ef... Kafka --> Component: OK end

The second phase starts after startup and its purpose is to process all messages received through Kafka. This will include any tenant that is created after all services are up and running. Tenant bootstrapping shows how to deal with these messages.

control Kafka control Component control DataBroker

Kafka -> Component: MESSAGEntopic:98797ce98af...nmessage: {"tenant" : "new-tenant"} Component -> DataBroker: GET /topic/device-datanHeaders: "Authorization: Bearer JWT" note left JWT contains new tenant end note DataBroker --> Component: OK {"topic" : "876ca876g7..."} Component -> Kafka: SUBSCRIBEntopic: 876ca876g7... Kafka --> Component: OK Component -> DataBroker: GET /topic/dojot.device-manager.devicesnHeaders: "Authorization: Bearer JWT" note left JWT contains new tenant end note DataBroker --> Component: OK {"topic" : "22432c4a..."} Component -> Kafka: SUBSCRIBEntopic: 22432c4a... Kafka --> Component: OK

All services that are somehow interested in using subjects should execute this procedure in order to correctly receive all messages.

Auth + API gateway (Kong)

Auth is a service deeply connected to Kong. It is responsible for user management, authentication and authorization. As such, it is invoked by Kong whenever an request is received by one of its registered endpoints. This section will detail how this is performed and how they work together.

Kong configuration

There are two configuration procedures when starting Kong within dojot:

  1. Migrating existing data
  2. Registering API endpoints and plugins.

The first task is performed by simply invoking Kong with a special flag.

The second one is performed by executing a configuration script Its only purpose is to register endpoints in Kong, such as:

(curl -o /dev/null ${kong}/apis -sS -X POST \
    --header "Content-Type: application/json" \
    -d @- ) <<PAYLOAD
    "name": "data-broker",
    "uris": ["/device/(.*)/latest", "/subscription"],
    "strip_uri": false,
    "upstream_url": "http://data-broker:80"

This command will register the endpoint /device/*/latest and /subscription and all requests to it are going to be forwarded to http//data-broker:80. You can check the documentation on how to add endpoints in Kong's documentation.

For some of its registered endpoints, will add two plugins to selected endpoints:

  1. JWT generation. The documentation for this plugin is available at Kong JWT plugin page.
  2. Configuration a plugin which will forward all policies requests to Auth. will invoke Auth in order to authenticate requests. This plugin is available in PEP-Kong repository.

The following request install these two plugins in data-broker API:

curl -o /dev/null -sS -X POST ${kong}/apis/data-broker/plugins -d "name=jwt"
curl -o /dev/null -sS -X POST ${kong}/apis/data-broker/plugins -d "name=pepkong" -d "config.pdpUrl=http://auth:5000/pdp"

Emitted messages

Auth will emit just one message via Kafka for tenant creation:

  "type" : "CREATE",
  "tenant" : "XYZ"

Device Manager

DeviceManager stores and retrieves information models for devices and templates and a few static information about them as well. Whenever a device is created, removed or just edited, it will publish a message through Kafka. It depends only on DataBroker and Kafka for reasons already explained in this document.

All messages published by Device Manager to Kafka can be seen in Device Manager messages.

IoT agent

IoT agents receive messages from devices and translate them into a default message to be published to other components. In order to do that, they might want to know which devices are created in order to properly filter messages which are not allowed into dojot (using, for instance, security information to block messages from unauthorized devices). It will use the device-data subject and bootstrap tenants as described in Bootstrapping tenants.

After requesting the topics for all tenants within device-data subject, IoT agent will start receiving data from devices. As there are a plethora of ways by which devices can do that, this step won't be detailed in this section (this is highly dependent on how each IoT agent works). It must, though, send a message to Kafka to inform other components of all new data that the device just sent. This is shown in IoT agent - kafka.

control Kafka

IoTAgent -> Kafka: SEND MESSAGEn topic:890874987ef...ndata: IoTAgentMessage Kafka -> IoTAgent: OK

The data sent by IoT agent has the structure shown in IoT agent message.

class Metadata {
  • deviceid: string
  • tenant: string
  • timestamp: long int


class IoTAgentMessage {
  • metadata: Metadata
  • attrs: Dict<string, any>


IoTAgentMessage::metadata -> Metadata

Such message would be:

    "metadata": {
        "deviceid": "c6ea4b",
        "tenant": "admin",
        "timestamp": 1528226137452
    "attrs": {
        "humidity": 60,
        "temperature" : 23


Persister is a very simple service which only purpose is to receive messages from devices (using device-data subject) and store them into MongoDB. For that, the bootstrapping procedure (detailed in Bootstrapping tenants) is performed and, whenever a new message is received, it will create a new Mongo document and store it into the device's collection. This is shown in Persister.

control Kafka control Persister database MongoDB

Kafka -> Persister: MESSAGEntopic:98797ce98af...nmessage: IoTAgentMessage Persister -> MongoDB: NEW DOC { IoTAgentMessage } MongoDB --> Persister: OK Persister --> Kafka: COMMIT

This service is simple as it is by design.


History is also a very simple service: whenever a user or application sends a request to it, it will query MongoDB and build a proper message to send back to the user/application. This is shown in History.

actor User boundary Kong control History database MongoDB

User -> Kong: GET /device/history/efac?attr=temperaturenHeaders="Authorization: JWT" activate Kong Kong -> Kong: authorize Kong -> History: GET /history/efac?attr=temperaturenHeaders="Authorization: JWT" activate History History -> MongoDB: db.efac.find({attr=temperature}) MongoDB --> History: doc1, doc2 History -> History: processDocs([doc1, doc2]) History --> Kong: OKn{"efac":[nt{"temperature" : 10},nt{"temperature": 20}n]} deactivate History Kong -> User: OKn{"efac":[nt{"temperature" : 10},nt{"temperature": 20}n]} deactivate Kong

Data Broker

DataBroker has a few more functionalities than only generating topics for {tenant, subject} pairs. It will also serve connections to emit messages in real time. In order to do so, it retrieves all topics for device-data subject, just as in any other component interested in data received from devices. As soon as it receives a message, it will then forward it to a 'room' (using vocabulary) associated to the device and to the associated tenant. Thus, all client connected to it (such as graphical user interfaces) will receive a new message containing all the received data. For more information about how to open a connection with DataBroker, check DataBroker documentation.
