Skip to content
Permalink
Browse files

Merge pull request #6 from FirstLegoLeague/new_architecture

New architecture
  • Loading branch information
idanstark42 committed Dec 17, 2019
2 parents e2ec7d0 + 6104e1e commit 10de6644b4c3ddbd489a01ef93f855fef303d0a0
@@ -1,20 +1,40 @@
[![npm](https://img.shields.io/npm/v/@first-lego-league/ms-messenger.svg)](https://www.npmjs.com/package/@first-lego-league/ms-messenger)
[![codecov](https://codecov.io/gh/FirstLegoLeague/ms-messenger/branch/master/graph/badge.svg)](https://codecov.io/gh/FirstLegoLeague/ms-messenger)
[![Build status](https://ci.appveyor.com/api/projects/status/65scfycp2uyg83ri/branch/master?svg=true)](https://ci.appveyor.com/project/2roy999/ms-messenger/branch/master)
[![GitHub](https://img.shields.io/github/license/FirstLegoLeague/ms-messenger.svg)](https://github.com/FirstLegoLeague/ms-messenger/blob/master/LICENSE)

[![David Dependency Status](https://david-dm.org/FirstLegoLeague/ms-messenger.svg)](https://david-dm.org/FirstLegoLeague/ms-messenger)
[![David Dev Dependency Status](https://david-dm.org/FirstLegoLeague/ms-messenger/dev-status.svg)](https://david-dm.org/FirstLegoLeague/ms-messenger#info=devDependencies)
[![David Peer Dependencies Status](https://david-dm.org/FirstLegoLeague/ms-messenger/peer-status.svg)](https://david-dm.org/FirstLegoLeague/ms-messenger?type=peer)
[![Build status](https://ci.appveyor.com/api/projects/status/0ya8dl62755nn01g/branch/master?svg=true)](https://ci.appveyor.com/project/2roy999/ms-messenger/branch/master)
[![GitHub](https://img.shields.io/github/license/FirstLegoLeague/ms-messenger.svg)](https://github.com/FirstLegoLeague/ms-messenger/blob/master/LICENSE)

## FIRST LEGO Leage messenger
A MHub client working by the _FIRST_ LEGO League TMS [Module Standard](https://github.com/FirstLegoLeagueIL/architecture/blob/master/module-standard/v1.0-SNAPSHOT.md#log-messages).
# FIRST LEGO Leage messenger
A [MHub](https://www.npmjs.com/package/mhub) client working by the _FIRST_ LEGO League TMS [Module Standard](https://github.com/FirstLegoLeague/architecture/blob/master/module-standard/v1.0-SNAPSHOT.md#log-messages).

## The logic of this module
This module was meant to serve as an extendable messenger which already works by the Module Standard and allows you to easily listen to and send messages. It gives you all the functionality needed for a Mhub messenger running in node or in browser.

### In node
The messenger givven in node is fully [correlated](https://github.com/FirstLegoLeague/architecture/blob/master/module-standard/v1.0-SNAPSHOT.md#cross-module-correlations) and logged using [ms-logger](https://www.npmjs.com/package/@first-lego-league/ms-logger). It also has a client-id which recognizes it against other clients, and is sent in the headers for recognition.

### In browser
In broswer many of these feature are not required or needed. So the client only has a client-id, and isn't correlated or logged.

### Authentication
All messengers have the ability to be authenticated using MHub credentials.

### Ignoring
All messengers have the `ignoreNextMessageOfTopic` method, which tells them to ignore the next message of a givven topic.

### Stay Alive
All messengers have a failsafe mechanism that makes sure they restart when they get disconnected.

### Usage
## Usage

```javascript
const { Messenger } = require('@first-lego-league/ms-messenger')
const { createMessenger } = require('@first-lego-league/ms-messenger')
// Create a new object
const messenger = new Messenger({/* options... */})
const messenger = createMessenger({/* options... */})
messenger.listen('some:topic', (messageData, message) => {
// do something
@@ -23,13 +43,18 @@ messenger.listen('some:topic', (messageData, message) => {
messenger.send('some:topic', { data: {/* data... */} })
```

#### options
### Options

| **option** | **meaning** | **default** |
| **option** | **meaning** | **options** | **default** |
|--|--|--|
| mhubURI | The URI of the MHub server | `process.env.MHUB_URI` |
| node | The node to which the messenger is connected | `'default'` |
| clientId| An ID for the client | Random base64 text |
| reconnectTimeout | Time between connection attempts | 10 seconds |
| logger | A logger object of the pattern `{ debug, info, warn, error, fatal }`, can axcept `ms-logger` | empty logger |
| credentials | An object of the pattern `{ username, password }` to be used to login to the node. If none is specified, no login will be performed | none |
| mhubURI | The URI of the MHub server | Any valid WebSockets URI | `process.env.MHUB_URI` |
| node | The node to which the messenger is connected | String | `'default'` |
| clientId | An ID for the client | String | Random base64 string |
| reconnectTimeout | Time between connection attempts | Any number | 10 seconds |
| credentials | An object of the pattern `{ username, password }` to be used to login to the node. If none is specified, no login will be performed | Object with fields `username` and `password` | undefined |

## Contribution
To contribute to this repository, simply create a PR and set one of the Code Owners to be a reviewer.
Please notice the linting and UT, because they block merge.
Keep the package lightweight and easy to use.
Thank you for contributing!
@@ -13,7 +13,7 @@ install:
test_script:
# run tests
- yarn lint
- yarn test
- yarn test:ci

# Don't actually build.
build: off
@@ -0,0 +1,14 @@
const { MClient } = require('mhub/dist/src/browserclient')

const { createMessenger } = require('./lib/messenger_factory')

const DEFAULT_OPTIONS = {
mhubURI: process.env.MHUB_URI,
node: 'default',
reconnectTimeout: 10 * 1000 // 10 seconds
}

exports.createMessenger = options => {
options = Object.assign({ }, DEFAULT_OPTIONS, options)
return createMessenger(new MClient(options.mhubURI), options)
}
109 index.js
@@ -1,100 +1,21 @@
const MClient = require('./lib/mclient')
const { MClient } = require('mhub/dist/src/nodeclient')

class Messenger {
constructor (options = {}) {
this._Promise = options.promise || global.Promise
this.options = Object.assign({}, Messenger.DEFAULT_OPTIONS, options)
const { createMessenger } = require('./lib/messenger_factory')
const { correlateMesseger } = require('./lib/correlation')
const { logMessengerEvents } = require('./lib/logging')

this._logger = this.options.logger
this._client = new MClient(this.options.mhubURI)
this._topics = []
this._topicsToIgnore = []

this._client.on('error', msg => this._logger.error(`Unable to connect to mhub: ${msg}`))
this._client.on('close', msg => {
this._logger.error(`Disconnected from MHub: ${msg}`)
this._setTimeoutToReconnect()
})
}

listen (topic, callback) {
this._client.on('message', message => {
if (message.topic === topic) {
if (this._topicsToIgnore.includes(topic)) {
this._topicsToIgnore = this._topicsToIgnore.filter(t => t !== topic)
} else {
callback(message.data, message)
}
}
})
this._topics.push(topic)
return this.connect()
.then(() => this._client.subscribe(this.options.node, topic))
}

on (topic, callback) {
return this.listen(topic, callback)
}

send (topic, data) {
return this.connect()
.then(() => this._client.publish(this.options.node, topic, data, {
'client-id': this._clientId
}))
}

connect () {
if (!this._connectionPromise) {
this._logger.debug('Connecting to MHub')
this._connectionPromise = this._Promise.resolve(this._client.connect())
.then(() => this._logger.debug('Conneted to MHub'))

if (this.options.credentials) {
this._connectionPromise = this._connectionPromise
.then(() => this._client.login(this.options.credentials.username, this.options.credentials.password))
.then(() => this._logger.debug('Logged into MHub'))
}

if (this._topics.length) {
this._connectionPromise = this._connectionPromise
.then(() => this._Promise.all(this._topics.map(topic => this._client.subscribe(this.options.node, topic))))
}

this._connectionPromise = this._connectionPromise
.catch(msg => {
this._logger.error(`Could not connect to MHub ${msg}`)
this._setTimeoutToReconnect()
})
}
return this._connectionPromise
}

ignoreNextMessage (topic) {
this._topicsToIgnore.push(topic)
}

_setTimeoutToReconnect () {
this._connectionPromise = undefined
return new this._Promise(resolve => {
setTimeout(() => this.connect().then(resolve), this.options.reconnectTimeout)
})
}
}

const DO_NOTHING = () => { }

Messenger.DEFAULT_OPTIONS = {
const DEFAULT_OPTIONS = {
mhubURI: process.env.MHUB_URI,
node: 'default',
clientId: 'unknown',
reconnectTimeout: 10 * 1000, // 10 seconds
logger: {
debug: DO_NOTHING,
info: DO_NOTHING,
warn: DO_NOTHING,
error: DO_NOTHING,
fatal: DO_NOTHING
}
reconnectTimeout: 10 * 1000 // 10 seconds
}

exports.Messenger = Messenger
exports.createMessenger = options => {
options = Object.assign({ }, DEFAULT_OPTIONS, options)
const messenger = createMessenger(new MClient(options.mhubURI), options)

correlateMesseger(messenger)
logMessengerEvents(messenger)

return messenger
}
@@ -0,0 +1,11 @@
exports.authenticateMesseger = messenger => {
messenger._connectionPostactions.push(() => {
return messenger.client.login(messenger.options.credentials.username, messenger.options.credentials.password)
})

if (messenger.logger) {
messenger._connectionPostactions.push(() => {
messenger.logger.debug('Logged into MHub')
})
}
}
@@ -0,0 +1,10 @@
const randomize = require('randomatic')

function generateRandomClientId () {
return randomize('Aa0?', 16, { chars: '+/' })
}

exports.identifyMesseger = messenger => {
messenger.clientId = messenger.options.clientId || generateRandomClientId()
messenger._headersProviders.push(() => ({ 'client-id': messenger.clientId }))
}
@@ -0,0 +1,5 @@
const { getCorrelationId } = require('@first-lego-league/ms-correlation')

exports.correlateMesseger = messenger => {
messenger._headersProviders.push(() => ({ 'correlation-id': getCorrelationId() }))
}
@@ -0,0 +1,13 @@
exports.ignoreByTopic = messenger => {
messenger._handlingFilters.push((message, topic) => {
if (messenger._topicsToIgnore.includes(topic)) {
messenger._topicsToIgnore = messenger._topicsToIgnore.filter(topicToIgnore => topicToIgnore !== topic)
return false
} else {
return true
}
})

messenger._topicsToIgnore = []
messenger.ignoreNextMessageOfTopic = topic => messenger._topicsToIgnore.push(topic)
}
@@ -0,0 +1,11 @@
const { Logger } = require('@first-lego-league/ms-logger')

exports.logMessengerEvents = messenger => {
messenger.logger = new Logger()

messenger._connectionPreactions.push(() => messenger.logger.debug('Connecting to MHub'))
messenger._connectionPostactions.push(() => messenger.logger.debug('Connected to MHub'))

messenger.client.on('error', msg => messenger.logger.error(`Unable to connect to MHub: ${msg}`))
messenger.client.on('close', msg => messenger.logger.error(`Disconnected from MHub: ${msg}`))
}

This file was deleted.

@@ -0,0 +1,91 @@
class Messenger {
constructor (mhubClient, options) {
this.client = mhubClient
this.options = options
this.Promise = this.options.promise

this._handlingFilters = [(messege, topic) => messege.topic === topic]

this._connectionPreactions = []
this._listenPreactions = []
this._sendPreactions = []

this._connectionPostactions = []
this._listenPostactions = []
this._sendPostactions = []

this._connectionErroractions = []
this._sendErroractions = []

this._headerProviders = []
}

listen (topic, callback) {
this._listenPreactions.forEach(action => { action(topic, callback, this) })

this.client.on('message', message => {
if (this._shouldHandleMessage(message, topic)) {
callback(message.data, message)
}
})

let promise = this.connect()
.then(() => this.client.subscribe(this.options.node, topic))

this._listenPostactions.forEach(action => {
promise = promise.then(result => action(result, topic, callback, this))
})

return promise
}

on (topic, callback) {
return this.listen(topic, callback)
}

send (topic, data) {
this._sendPreactions.forEach(action => { action(topic, data, this) })

let promise = this.connect()
.then(() => this.client.publish(this.options.node, topic, data, this._headers()))

this._sendPostactions.forEach(action => {
promise = promise.then(result => action(result, topic, data, this))
})

promise.catch(error => this._sendErroractions.forEach(action => action(error)))

return promise
}

connect () {
if (!this._connectionPromise) {
this._connectionPreactions.forEach(action => { action(this) })

this._connectionPromise = this.Promise.resolve(this.client.connect())

this._connectionPostactions.forEach(action => {
this._connectionPromise = this._connectionPromise.then(result => action(result, this))
})

this._connectionPromise.catch(error => this._connectionErroractions.forEach(action => action(error)))
}

return this._connectionPromise
}

_headers () {
return this._headerProviders.reduce((headers, headerProvider) => {
Object.assign(headers, headerProvider())
return headers
}, { })
}

_shouldHandleMessage (message, topic) {
return this._handlingFilters.reduce((shouldHandleMessege, filter) => {
return shouldHandleMessege && filter(message, topic)
}, true)
}
}

exports.Messenger = Messenger
@@ -0,0 +1,19 @@
const { Messenger } = require('./messenger')
const { identifyMesseger } = require('./client_identity')
const { ignoreByTopic } = require('./ignoring')
const { keepAlive } = require('./stay_alive')
const { authenticateMesseger } = require('./authentication')

exports.createMessenger = (client, options) => {
const messenger = new Messenger(client, options)

identifyMesseger(messenger)
ignoreByTopic(messenger)
keepAlive(messenger)

if (options.credentials) {
authenticateMesseger(messenger)
}

return messenger
}

0 comments on commit 10de664

Please sign in to comment.
You can’t perform that action at this time.