Skip to content

devicehive/devicehive-ws-proxy

Repository files navigation

License

WebSocket proxy to message broker

The project wrap some of the essential message broker functionality allowing you to communicate with the next message brokers through WebSockets:

  • Kafka

Start the proxy

Internal mode

Run the next command:

- node ./src/proxy.js

External mode (with enabled PluginManager)

In configuration file:

[path-to-proxy-project]/src/config.json

set the ENABLE_PLUGIN_MANAGER field to true or
Set environmental variable PROXY.ENABLE_PLUGIN_MANAGER to true
And run the next command:

- node ./src/proxy.js

By default proxy listening for WS connections on ws://localhost:3000 (independent from the mode). To change it set the WEB_SOCKET_SERVER_HOST and WEB_SOCKET_SERVER_PORT fields in the configuration file or set environmental variables: PROXY.WEB_SOCKET_SERVER_HOST and PROXY.WEB_SOCKET_SERVER_PORT

Configuration

Proxy

[path-to-proxy-project]/src/config.json
  • WEB_SOCKET_SERVER_HOST - WebSocket server host address (default: "localhost");
  • WEB_SOCKET_SERVER_PORT - WebSocket server port to listen (default: 3000);
  • WEB_SOCKET_PING_INTERVAL_SEC - Time interval in seconds between ping messages (default: 30);
  • ACK_ON_EVERY_MESSAGE_ENABLED - Enable/disable acknowledgment for every received message (default: false);
  • ENABLE_PLUGIN_MANAGER - Enable plugin manager (default: false);
  • COMMUNICATOR_TYPE - Message broker that will be used internally (default: "kafka");
  • APP_LOG_LEVEL - Proxy logger level: debug, info, warn, error (default: "info");

Each configuration field can be overridden with corresponding environmental variable with "PROXY" prefix, for example:

PROXY.WEB_SOCKET_SERVER_PORT=6000

Prefix separator can be overridden by ENVSEPARATOR environmental variable. Example:

ENVSEPARATOR=_
PROXY_WEB_SOCKET_SERVER_PORT=6000

Message Buffer configuration

[path-to-proxy-project]/src/messageBuffer/config.json
  • MAX_SIZE_MB - Maximum Message Buffer size in MB (default: 128);

Each configuration field can be overridden with corresponding environmental variable with "MESSAGE_BUFFER" prefix, for example:

MESSAGE_BUFFER.MAX_SIZE_MB=256

Prefix separator can be overridden by ENVSEPARATOR environmental variable. Example:

ENVSEPARATOR=_
MESSAGE_BUFFER_MAX_SIZE_MB=256

Plugin Manager configuration

[path-to-proxy-project]/src/pluginManager/config.json

Each configuration field can be overridden with corresponding environmental variable with "PLUGIN_MANAGER" prefix, for example:

PLUGIN_MANAGER.AUTH_SERVICE_ENDPOINT=http://localhost:9090/dh/rest

Prefix separator can be overridden by ENVSEPARATOR environmental variable. Example:

ENVSEPARATOR=_
PLUGIN_MANAGER_AUTH_SERVICE_ENDPOINT=http://localhost:9090/dh/rest

Message Brokers

Kafka

[path-to-proxy-project]/src/kafka/config.json
  • KAFKA_HOSTS - Address to Kafka server (default: "localhost:9092");
  • KAFKA_CLIENT_ID - Kafka client name prefix (default: "ws-proxy-kafka-client");
  • CONSUMER_GROUP_ID - Kafka consumer group prefix (default: "ws-proxy-consumer-group");
  • LOGGER_LEVEL Kafka logger level (default: 0);
  • METADATA_POLLING_INTERVAL_MS Kafka metadata polling interval (default: 1000);
  • PRODUCER_MINIMAL_BATCHING_THROUGHPUT_PER_SEC_B Maximum throughput on producer side to work without batching (default: 1000);
  • CONSUMER_IDLE_TIMEOUT Timeout between consumer fetching requests (default: 0);
  • CONSUMER_MAX_WAIT_TIME Maximum wait time to collect batch with size of CONSUMER_MAX_BYTES (default: 0);
  • CONSUMER_MAX_BYTES Maximum batch size on consumer side in bytes (default: 1000000);

Each configuration field can be overridden with corresponding environmental variable with "KAFKA" prefix, for example:

KAFKA.KAFKA_HOSTS=localhost:9094

Prefix separator can be overridden by ENVSEPARATOR environmental variable. Example:

ENVSEPARATOR=_
KAFKA_KAFKA_HOSTS=localhost:9094

Proxy modules logging

Through the "DEBUG" environment variable you are able to specify next modules loggers:

  • websocketserver - WebSocket Server module logging;
  • internalcommunicatorfacade - Facade between WebSocket server and internal message broker module logging;
  • pluginmanager - Plugin Manager module logging;
  • messagebuffer - Message Buffer module logging;
  • kafka - Kafka Client module logging;

Example:

DEBUG=kafka,messagebuffer,websocketserver

Message Structure

General

All messages are JSON based. Generic message structure looks like this:

{
  "id": "id of original message",
  "t": "message type",
  "a": "action",
  "s": "success",
  "p": "payload"
}
Param Type Description
id String or Int Original Message Id
t String Type: ["topic","notif","health","plugin"]
a String Action: ["create","list","subscribe","unsubscribe","authenticate" "ack"]
s Int Status, returned by the server, 0 if OK.
p Object Payload object

Server can receive a list of messages in one batch.

Ack message:

Success ACK:

{
    "t": "ack",
    "s": 0
}

Failure ACK:

{
    "t": "ack",
    "s": 1,
    "p": { "m": <error message> }
}

Topics

Create

Request message:

{
    "t": "topic",
    "a": "create",
    "p": { "t": ["topic1", "topic2", "topicN"] }
}

Response message:

{
    "t": "topic",
    "a": "create",
    "p": { "t": ["topic1", "topic2", "topicN"] },
    "s": 0
}

Error message:

{
    "t": "topic",
    "a": "create",
    "p": { "m": <error message> },
    "s": 1
}

List

Request message:

{
    "t": "topic",
    "a": "list"
}

Response message:

{
    "t": "topic",
    "a": "list",
    "p": { "t": ["topic1", "topic2", "topicN"] },
    "s": 0
}

Error message:

{
    "t": "topic",
    "a": "list",
    "p": { "m": <error message> },
    "s": 1
}

Subscribe

Request message:

{
    "t": "topic",
    "a": "subscribe",
    "p": {
        "sg": "subscriptionGroup",
        "t": ["topic1", "topic2"]
    }
}

Response message:

{
    "t": "topic",
    "a": "subscribe",
    "p": { "sg": "subscriptionGroup", "t": ["topic1", "topic2"] },
    "s": 0
}

Error message:

{
    "t": "topic",
    "a": "subscribe",
    "p": { "m": <error message> },
    "s": 1
}

Where subscriptionGroup - group of consumers where messages apportions via balancing (RoundRobin) logic

Unsubscribe

Request message:

{
    "t": "topic",
    "a": "unsubscribe",
    "p": { "t": ["topic1", "topic2"] }
}

Response message:

{
    "t": "topic",
    "a": "unsubscribe",
    "p": { "t": ["topic1", "topic2"] },
    "s": 0
}

Error message:

{
    "t": "topic",
    "a": "unsubscribe",
    "p": { "m": <error message> },
    "s": 1
}

Plugin

Authenticate

Request message:

{
    "t": "plugin",
    "a": "authenticate",
    "p": { "token": <plugin access token> }
}

Response message:

{
    "t": "plugin",
    "a": "authenticate",
    "p": {
        "tpc": <plugin topic name>,
        "e": <plugin access token expiration date>,
        "t": 1
    },
    "s": 0
}

Where:

tpc - plugin topic name
e - plugin access token expiration date
t - plugin token type (0 - refresh token, 1 - access token)

Error message:

{
    "t": "plugin",
    "a": "authenticate",
    "p": { "m": <error message> },
    "s": 1
}

Notification

Send

Request message:

{
    "t": "notif",
    "a": "create",
    "p": {
        "t": "topic1",
        "m": <notification message srting>,
        "part": 1
    }
}

Response message:

{
    "t": "notif",
    "a": "create",
    "s": 0
}

Error message:

{
    "t": "notif",
    "a": "create",
    "p": { "m": <error message> },
    "s": 1
}

Receive

Notifications are received automatically after subscription. Notification message

{
    "t": "notif",
    "p": { "m": <notification message string> }
}

Healthcheck

Request message

{
    "t": "health"
}

Response message:

{
    "t": "health",
    "s": 0,
    "p": {
        "prx": "Available|Not Available",
        "mb": "Available|Not Available",
        "mbfp": <0-100>,
        "comm": "Available|Not Available"
    }
}

Where:

prx - Proxy Status
mb - Message buffer status
mbfp - Message Buffer fill percentage
comm - Internal message broker status