@emqplus emqplus released this Jul 9, 2015 · 2322 commits to emqx30 since this release

Assets 2

Release Highlights

Session

Every client will start_link a session process, whether or not the client is persistent.

Client could resume a persistent session on other clustered node.

Session State in the broker consists of:

  1. The Client’s subscriptions.
  2. inflight qos1/2 messages sent to the client but unacked, QoS 1 and QoS 2 messages which have been sent to the Client, but have not been completely acknowledged.
  3. inflight qos2 messages received from client and waiting for pubrel. QoS 2 messages which have been received from the Client, but have not been completely acknowledged.
  4. all qos1, qos2 messages published to when client is disconnected. QoS 1 and QoS 2 messages pending transmission to the Client.
  5. Optionally, QoS 0 messages pending transmission to the Client.

MQueue and Inflight Window

Each session has a simple in-memory message queue.

Notice that MQTT is not an enterprise messaging queue. MQTT assume that client
should be online in most of the time.

This module implements a simple in-memory queue for MQTT persistent session.

If the broker restarted or crashed, all the messages queued will be gone.

Desgin of The Queue:

      |<----------------- Max Len ----------------->|
      -----------------------------------------------
IN -> |       Pending Messages   | Inflight Window  | -> Out
      -----------------------------------------------
                                 |<--- Win Size --->|
  1. Inflight Window to store the messages awaiting for ack.
  2. IN messages when the session is offline, or inflight window is full.
  3. If the queue is full, dropped qos0 messages if store_qos0 is true, otherwise dropped the oldest pending one.

Hooks

Name Type Description
client.connected foreach Run when client connected successfully
client.subscribe foldl Run when client subscribe topics
client.unsubscribe foldl Run when client unsubscribe topics
message.publish foldl Run when message is published
message.acked foldl Run when message is acked
client.disconnected foreach Run when client is disconnnected

Global Unique Message ID

End-to-End Message Route:

 PktId <-- --> MsgId <-- --> MsgId <-- --> PktId
     |<--- Qos --->|<---PubSub--->|<-- Qos -->|

Global unique id for mqtt message:

--------------------------------------------------------
|        Timestamp       |  NodeID + PID  |  Sequence  |
|<------- 64bits ------->|<--- 48bits --->|<- 16bits ->|
--------------------------------------------------------
  1. Timestamp: erlang:system_time if Erlang >= R18, otherwise os:timestamp
  2. NodeId: encode node() to 2 bytes integer
  3. Pid: encode pid to 4 bytes integer
  4. Sequence: 2 bytes sequence in one process

Protocol Compliant

MQTT v3.1.1 protocol specification:

4.4 Message delivery retry(#166)
4.6 Message ordering(#167)

Alarm Management

Add emqttd_alarm module and publish json format alarms to '$SYS/brokers/+/alarms/#' topics.

Project Structure

Merge emqtt and emqttd apps, and change the project structure. You could embed the emqttd broker into your project now.