Skip to content

Pigeon-MQTT-Nest is a lightweight and easy-to-use library that provides a simple MQTT broker for your NestJS applications. With this library, you can easily integrate MQTT messaging into your NestJS application and communicate with MQTT clients using topics and messages.

behnamnasehi/pigeon-mqtt-nest

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

82 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Nest Mqtt Server Logo

npm version npm download npm lisence github star github fork

Nest.js MQTT broker that can run on any stream server

Topics

What is MQTT ?

MQTT is a lightweight IoT messaging protocol based on the publish/subscribe model. It can provide real-time and reliable messaging services for networked devices with very little code and bandwidth. It is widely used in the industries such as the IoT, mobile Internet, smart hardware, Internet of Vehicles and power energy.

MQTT (Message Queuing Telemetry Transport) is an open source, lightweight messaging protocol, optimized for low latency. This protocol provides a callable and cost-efficient way to connect devices using a publish/subscribe model. A communication system built on MQTT consists of the publishing server, a broker and one or more clients. It is designed for constrained devices and low-bandwidth, high-latency or unreliable networks.

What is Pigeon MQTT ?

Pigeon-MQTT-Nest is a lightweight and easy-to-use library that provides a simple MQTT broker for your NestJS applications. With this library, you can easily integrate MQTT messaging into your NestJS application and communicate with MQTT clients using topics and messages.

This library provides a customizable MQTT broker that can be easily integrated into your NestJS project. It uses the popular Eclipse Mosquitto library as the underlying MQTT broker engine and supports the MQTT 3.1, 3.1.1, and 5.0 protocol versions.

Pigeon-MQTT-Nest is designed to be flexible and easy to use. It provides a simple API that allows you to create topics, subscribe to topics, and publish messages to topics with just a few lines of code. Additionally, it provides advanced features such as message persistence and quality of service (QoS) levels.

Whether you're building a real-time chat application, IoT device management system, or any other kind of application that requires messaging, Pigeon-MQTT-Nest makes it easy to add MQTT messaging to your NestJS project.

Installation

Install From NPM :

$ npm i pigeon-mqtt-nest

https://nodei.co/npm/pigeon-mqtt-nest.png?downloads=true&downloadRank=true&stars=true


Usage

Pigeon-Mqtt-Nestjs will register as a global module. You can import with configuration

@Module({
  imports: [

    PigeonModule.forRoot({
      port:1884, // Port MQTT TCP Server
      transport:Transport.TCP,
      id: "binarybeast",
      concurrency:100,
      queueLimit:42,
      maxClientsIdLength:23,
      connectTimeout:30000,
      heartbeatInterval:60000,
     })

  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {
}
  • options <object>
    • mq <MQEmitter> middleware used to deliver messages to subscribed clients. In a cluster environment it is used also to share messages between brokers instances. Default: mqemitter
    • concurrency <number> maximum number of concurrent messages delivered by mq. Default: 100
    • persistence <Persistence> middleware that stores QoS > 0, retained, will packets and subscriptions. Default: aedes-persistence (in memory)
    • queueLimit <number> maximum number of queued messages before client session is established. If number of queued items exceeds, connectionError throws an error Client queue limit reached. Default: 42
    • maxClientsIdLength option to override MQTT 3.1.0 clients Id length limit. Default: 23
    • heartbeatInterval <number> an interval in millisconds at which server beats its health signal in $SYS/<aedes.id>/heartbeat topic. Default: 60000
    • id <string> aedes broker unique identifier. Default: uuidv4()
    • connectTimeout <number> maximum waiting time in milliseconds waiting for a CONNECT packet. Default: 30000
  • Returns <Aedes>

Handlers

Handler Emitted When
preConnect Invoked when server receives a valid CONNECT packet.
authenticate Invoked after preConnect.
authorizePublish publish LWT to all online clients,incoming client publish
authorizeSubscribe restore subscriptions in non-clean session.,incoming client SUBSCRIBE
published same as Event: publish, but provides a backpressure functionality.

Handler: preConnect

  • client: <Client>
  • packet: <object> CONNECT
  • callback: <Function> (error, successful) => void
    • error <Error> | null
    • successful <boolean>

Invoked when server receives a valid CONNECT packet. The packet can be modified.

client object is in default state. If invoked callback with no errors and successful be true, server will continue to establish a session.

Any error will be raised in connectionError event.

Some Use Cases:

  1. Rate Limit / Throttle by client.conn.remoteAddress
  2. Check aedes.connectedClient to limit maximum connections
  3. IP blacklisting
@Injectable()
export class TestService {

  @onPreConnect()
  onPreConnect(@Client() client, @Packet() packet, @Function() done) {
    console.log("Function: @onPreConnect()");
    return done(null, true);
  }

}

Handler: authenticate

  • client: <Client>
  • credential: <string>
  • callback: <Function> (error, successful) => void
    • error <Error> | null
    • successful <boolean>

Invoked after preConnect.

Server parses the CONNECT packet, initializes client object which set client.id to match the one in CONNECT packet and extract username and password as parameters for user-defined authentication flow.

If invoked callback with no errors and successful be true, server authenticates client and continues to setup the client session.

If authenticated, server acknowledges a CONNACK with returnCode=0, otherwise returnCode=5. Users could define the value between 2 and 5 by defining a returnCode property in error object.

@Injectable()
export class TestService {

  @onAuthenticate()
  onAuthenticate(@Client() client, @Credential() credential, @Function() done) {
    console.log("Function: @onAuthenticate()");
    return done(null, true);
  }

}
@Injectable()
export class TestService {

  @onAuthenticate()
  onAuthenticate(@Client() client, @Credential() credential, @Function() done) {
    console.log("Function: @onAuthenticate()");
    var error = new Error('Auth error')
    error.returnCode = 4
    return done(error, false);
  }

}

Please refer to Connect Return Code to see their meanings.

Handler: authorizePublish

  • client: <Client> | null
  • packet: <object> PUBLISH
  • callback: <Function> (error) => void
    • error <Error> | null

Invoked when

  1. publish LWT to all online clients
  2. incoming client publish

client is null when aedes publishes obsolete LWT without connected clients

If invoked callback with no errors, server authorizes the packet otherwise emits clientError with error. If an error occurs the client connection will be closed, but no error is returned to the client (MQTT-3.3.5-2)

@Injectable()
export class TestService {

  @onAuthorizePublish()
  onAuthorizePublish(@Client() client, @Packet() packet, @Function() done) {
    console.log("Function: @onAuthorizePublish()");
    if (packet.topic === 'aaaa') {
      return done(new Error('wrong topic'))
    }
    if (packet.topic === 'bbb') {
      packet.payload = Buffer.from('overwrite packet payload')
    }
    return done(null);
  }

}

By default authorizePublish throws error in case a client publish to topics with $SYS/ prefix to prevent possible DoS (see #597). If you write your own implementation of authorizePublish we suggest you to add a check for this. Default implementation:

@Injectable()
export class TestService {

  @onAuthorizePublish()
  onAuthorizePublish(@Client() client, @Packet() packet, @Function() done) {
    if (packet.topic.startsWith($SYS_PREFIX)) {
      return done(new Error($SYS_PREFIX + ' topic is reserved'))
    }
    return done(null);
  }

}

Handler: authorizeSubscribe

  • client: <Client>
  • subscription: <object>
  • callback: <Function> (error) => void
    • error <Error> | null
    • subscription: <object> | null

Invoked when

  1. restore subscriptions in non-clean session.
  2. incoming client SUBSCRIBE

subscription is a dictionary object like { topic: hello, qos: 0 }.

If invoked callback with no errors, server authorizes the packet otherwise emits clientError with error.

In general user should not touch the subscription and pass to callback, but server gives an option to change the subscription on-the-fly.

@Injectable()
export class TestService {

  @onAuthorizeSubscribe()
  onAuthorizeSubscribe(@Client() client, @Subscription() subscription, @Function() done) {
    console.log("Function: @onAuthorizeSubscribe()");
    if (subscription.topic === 'aaaa') {
      return done(new Error('wrong topic'))
    }
    if (subscription.topic === 'bbb') {
      // overwrites subscription
      subscription.topic = 'foo'
      subscription.qos = 1
    }
    return done(null, subscription);
  }

}

To negate a subscription, set the subscription to null. Aedes ignores the negated subscription and the qos in SubAck is set to 128 based on MQTT 3.11 spec:

@Injectable()
export class TestService {

  @onAuthorizeSubscribe()
  onAuthorizeSubscribe(@Client() client, @Subscription() subscription, @Function() done) {
    done(null, subscription.topic === 'aaaa' ? null : sub)
  }

}

Handler: published

same as Event: publish, but provides a backpressure functionality. TLDR; If you are doing operations on packets that MUST require finishing operations on a packet before handling the next one use this otherwise, especially for long-running operations, you should use Event: publish instead.

@Injectable()
export class TestService {

  @onPublish()
  OnPublish(@Topic() topic, @Packet() packet, @Payload() payload, @Client() client) {
    console.log("Function: @OnPublish()");
  }

}

Events

Name Emitted When
Client client registers itself to server
Client Ready client has received all its offline messages and be initialized.
Client Disconnect client disconnects.
Client Error an error occurs.
Connection Error an error occurs.
Keep Alive Timeout timeout happes in the client keepalive.
Publish servers delivers the packet to subscribed client.
Ack packet successfully delivered to the client.
Subscribe client successfully subscribe the subscriptions in server.
Unsubscribe client successfully unsubscribe the subscriptions in server.
Connack Sent server sends an acknowledge to client.
Closed server is closed.

Event: client

Emitted when the client registers itself to server. The client is not ready yet. Its connecting state equals to true.

Server publishes a SYS topic $SYS/<aedes.id>/new/clients to inform it registers the client into its registration pool. client.id is the payload.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }

  @onClient()
  OnNewClient(@Client() client) {
    console.log("Function: @onClient()");
  }

}

Event: clientReady

Emitted when the client has received all its offline messages and be initialized. The client connected state equals to true and is ready for processing incoming messages.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }

  @onClientReady()
  async onClientReady(@Client() client) {
    console.log("Function: @onClientReady()");
  }

}

Event: clientDisconnect

Emitted when a client disconnects.

Server publishes a SYS topic $SYS/<aedes.id>/disconnect/clients to inform it deregisters the client. client.id is the payload.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }

  @onClientDisconnect()
  OnClientDisconnect(@Client() client) {
    console.log("Function: @OnClientDisconnect()");
  }

}

Event: clientError

Emitted when an error occurs.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }

  @onClientError()
  OnClientError(@Client() client, @Error() error) {
    console.log("Function: @onClientError()");
  }

}

Event: connectionError

Emitted when an error occurs. Unlike clientError it raises only when client is uninitialized.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }

  @onConnectionError()
  OnConnectionError(@Client() client, @Error() error) {
    console.log("Function: @OnConnectionError()");
  }

}

Event: keepaliveTimeout

Emitted when timeout happes in the client keepalive.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }

  @onKeepLiveTimeout()
  onKeepLiveTimeout(@Client() client) {
    console.log("Function: @onKeepLiveTimeout()");
  }

}

Event: publish

Emitted when servers delivers the packet to subscribed client. If there are no clients subscribed to the packet topic, server still publish the packet and emit the event. client is null when packet is an internal message like aedes heartbeat message and LWT.

Note! packet belongs aedes-packet type. Some properties belongs to aedes internal, any changes on them will break aedes internal flow.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }

  @onPublish()
  OnPublish(@Topic() topic, @Packet() packet, @Payload() payload, @Client() client) {
    console.log("Function: @OnPublish()");
  }

}

Event: ack

Emitted an QoS 1 or 2 acknowledgement when the packet successfully delivered to the client.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }


  @onAck()
  onAck(@Client() client, @Packet() packet) {
    console.log("Function: @onAck()");
  }

}

Event: subscribe

  • subscriptions <object>
  • client <Client>

Emitted when client successfully subscribe the subscriptions in server.

subscriptions is an array of { topic: topic, qos: qos }. The array excludes duplicated topics and includes negated subscriptions where qos equals to 128. See more on authorizeSubscribe

Server publishes a SYS topic $SYS/<aedes.id>/new/subscribers to inform a client successfully subscribed to one or more topics. The payload is a JSON that has clientId and subs props, subs equals to subscriptions array.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }

  @onSubscribe()
  OnSubscribe(@Subscription() subscription, @Client() client) {
    console.log("Function: @OnSubscribe()");
  }

}

Event: unsubscribe

  • unsubscriptions Array<string>
  • client <Client>

Emitted when client successfully unsubscribe the subscriptions in server.

unsubscriptions are an array of unsubscribed topics.

Server publishes a SYS topic $SYS/<aedes.id>/new/unsubscribers to inform a client successfully unsubscribed to one or more topics. The payload is a JSON that has clientId and subs props, subs equals to unsubscriptions array.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }

  @onUnsubscribe()
  OnUnsubscribe(@Subscription() subscription, @Client() client) {
    console.log("Function: @OnUnsubscribe()");
  }

}

Event: connackSent

Emitted when server sends an acknowledge to client. Please refer to the MQTT specification for the explanation of returnCode object property in CONNACK.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }

  @onConnackSent()
  onConnackSent(@Client() client, @Packet() packet) {
    console.log("Function: @onConnackSent()");
  }

}

Event: closed

Emitted when server is closed.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }

  @onClosed()
  onClosed(@Client() client, @Packet() packet) {
    console.log("Function: @onClosed()");
  }

}

Methods

Method Emitted When
Publish Directly deliver packet on behalf of server to subscribed clients.
Close Close aedes server and disconnects all clients.

Method: Publish

Directly deliver packet on behalf of server to subscribed clients. Bypass authorizePublish.

callback will be invoked with error arugments after finish.

@Injectable()
export class TestService {

  //inject Pigeon Service
  constructor(@Inject(PigeonService) private readonly pigeonService: PigeonService) {
  }

  @onPublish()
  async OnPublish(@Topic() topic, @Packet() packet, @Payload() payload, @Client() client) {

    //use this method to publish
    await this.pigeonService.publish({
      topic: "test2", qos: 0, cmd: "publish", payload: "", dup: false, retain: false
    });

  }

}

Method: Close

  • callback: <Function>

Close aedes server and disconnects all clients.

callback will be invoked when server is closed.

@Injectable()
export class TestService {

  //inject Pigeon Service
  constructor(@Inject(PigeonService) private readonly pigeonService: PigeonService) {
  }

  @onPublish()
  async OnPublish(@Topic() topic, @Packet() packet, @Payload() payload, @Client() client) {

    //use this method to publish
    await this.pigeonService.close();

  }

}

Packets

This section describes the format of all packets

Packet -
Connect ---
Connack ---
Subscribe ---
Suback ---
Unsubscribe ---
Unsuback ---
Publish ---
Puback ---
Pubrec ---
Pubrel ---
Pubcomp ---
Pingreq ---
Pingresp ---
Disconnect ---

Packet: Connect

{
  cmd: 'connect',
  protocolId: 'MQTT', // Or 'MQIsdp' in MQTT 3.1 and 5.0
  protocolVersion: 4, // Or 3 in MQTT 3.1, or 5 in MQTT 5.0
  clean: true, // Can also be false
  clientId: 'my-device',
  keepalive: 0, // Seconds which can be any positive number, with 0 as the default setting
  username: 'matteo',
  password: Buffer.from('collina'), // Passwords are buffers
  will: {
    topic: 'mydevice/status',
    payload: Buffer.from('dead'), // Payloads are buffers
    properties: { // MQTT 5.0
      willDelayInterval: 1234,
      payloadFormatIndicator: false,
      messageExpiryInterval: 4321,
      contentType: 'test',
      responseTopic: 'topic',
      correlationData: Buffer.from([1, 2, 3, 4]),
      userProperties: {
        'test': 'test'
      }
    }
  },
  properties: { // MQTT 5.0 properties
      sessionExpiryInterval: 1234,
      receiveMaximum: 432,
      maximumPacketSize: 100,
      topicAliasMaximum: 456,
      requestResponseInformation: true,
      requestProblemInformation: true,
      userProperties: {
        'test': 'test'
      },
      authenticationMethod: 'test',
      authenticationData: Buffer.from([1, 2, 3, 4])
  }
}

If password or will.payload are passed as strings, they will automatically be converted into a Buffer.

Packet: Connack

{
  cmd: 'connack',
  returnCode: 0, // Or whatever else you see fit MQTT < 5.0
  sessionPresent: false, // Can also be true.
  reasonCode: 0, // reason code MQTT 5.0
  properties: { // MQTT 5.0 properties
      sessionExpiryInterval: 1234,
      receiveMaximum: 432,
      maximumQoS: 2,
      retainAvailable: true,
      maximumPacketSize: 100,
      assignedClientIdentifier: 'test',
      topicAliasMaximum: 456,
      reasonString: 'test',
      userProperties: {
        'test': 'test'
      },
      wildcardSubscriptionAvailable: true,
      subscriptionIdentifiersAvailable: true,
      sharedSubscriptionAvailable: false,
      serverKeepAlive: 1234,
      responseInformation: 'test',
      serverReference: 'test',
      authenticationMethod: 'test',
      authenticationData: Buffer.from([1, 2, 3, 4])
  }
}

Packet: Subscribe

{
  cmd: 'subscribe',
  messageId: 42,
  properties: { // MQTT 5.0 properties
    subscriptionIdentifier: 145,
    userProperties: {
      test: 'test'
    }
  }
  subscriptions: [{
    topic: 'test',
    qos: 0,
    nl: false, // no Local MQTT 5.0 flag
    rap: true, // Retain as Published MQTT 5.0 flag
    rh: 1 // Retain Handling MQTT 5.0
  }]
}

Packet: Suback

{
  cmd: 'suback',
  messageId: 42,
  properties: { // MQTT 5.0 properties
    reasonString: 'test',
    userProperties: {
      'test': 'test'
    }
  }
  granted: [0, 1, 2, 128]
}

Packet: Unsubscribe

{
  cmd: 'unsubscribe',
  messageId: 42,
  properties: { // MQTT 5.0 properties
    userProperties: {
      'test': 'test'
    }
  }
  unsubscriptions: [
    'test',
    'a/topic'
  ]
}

Packet: Unsuback

{
  cmd: 'unsuback',
  messageId: 42,
  properties: { // MQTT 5.0 properties
    reasonString: 'test',
    userProperties: {
      'test': 'test'
    }
  }
}

Packet: Publish

{
  cmd: 'publish',
          messageId: 42,
          qos: 2,
          dup: false,
          topic: 'test',
          payload: Buffer.from('test'),
          retain: false,
          properties: { // optional properties MQTT 5.0
    payloadFormatIndicator: true,
            messageExpiryInterval: 4321,
            topicAlias: 100,
            responseTopic: 'topic',
            correlationData: Buffer.from([1, 2, 3, 4]),
            userProperties: {
      'test': 'test'
    },
    subscriptionIdentifier: 120, // can be an Array in message from broker, if message included in few another subscriptions
            contentType: 'test'
  }
}

Packet: Puback

{
  cmd: 'puback',
          messageId: 42,
          reasonCode: 16, // only for MQTT 5.0
          properties: { // MQTT 5.0 properties
    reasonString: 'test',
            userProperties: {
      'test': 'test'
    }
  }
}

Packet: Pubrec

{
  cmd: 'pubrec',
          messageId: 42,
          reasonCode: 16, // only for MQTT 5.0
          properties: { // properties MQTT 5.0
    reasonString: 'test',
            userProperties: {
      'test': 'test'
    }
  }
}

Packet: Pubrel

{
  cmd: 'pubrel',
          messageId: 42,
          reasonCode: 16, // only for MQTT 5.0
          properties: { // properties MQTT 5.0
    reasonString: 'test',
            userProperties: {
      'test': 'test'
    }
  }
}

Packet: Pubcomp

{
  cmd: 'pubcomp',
          messageId: 42,
          reasonCode: 16, // only for MQTT 5.0
          properties: { // properties MQTT 5.0
    reasonString: 'test',
            userProperties: {
      'test': 'test'
    }
  }
}

Packet: Pingreq

{
  cmd: 'pingreq'
}

Packet: Pingresp

{
  cmd: 'pingresp'
}

Packet: Disconnect

{
  cmd: 'disconnect',
          reasonCode: 0, // MQTT 5.0 code
          properties: { // properties MQTT 5.0
    sessionExpiryInterval: 145,
            reasonString: 'test',
            userProperties: {
      'test': 'test'
    },
    serverReference: 'test'
  }
}

Middleware Plugins

Persistence

  • [aedes-persistence]: In-memory implementation of an Aedes persistence
  • [aedes-persistence-mongodb]: MongoDB persistence for Aedes
  • [aedes-persistence-redis]: Redis persistence for Aedes
  • [aedes-persistence-level]: LevelDB persistence for Aedes
  • [aedes-persistence-nedb]: NeDB persistence for Aedes

MQEmitter

  • [mqemitter]: An opinionated memory Message Queue with an emitter-style API
  • [mqemitter-redis]: Redis-powered mqemitter
  • [mqemitter-mongodb]: Mongodb based mqemitter
  • [mqemitter-child-process]: Share the same mqemitter between a hierarchy of child processes
  • [mqemitter-cs]: Expose a MQEmitter via a simple client/server protocol
  • [mqemitter-p2p]: A P2P implementation of MQEmitter, based on HyperEmitter and a Merkle DAG
  • [mqemitter-aerospike]: Aerospike mqemitter

Dependencies


Stay in touch


License

MIT License

Copyright (c) 2021 behnamnasehi

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

GO TO TOP

About

Pigeon-MQTT-Nest is a lightweight and easy-to-use library that provides a simple MQTT broker for your NestJS applications. With this library, you can easily integrate MQTT messaging into your NestJS application and communicate with MQTT clients using topics and messages.

Topics

Resources

Stars

Watchers

Forks

Packages

No packages published