This repository has been archived by the owner on Dec 14, 2021. It is now read-only.
/
index.js
109 lines (96 loc) · 3.53 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
'use strict';
const util = require('util');
const ttn = require('ttn');
const iothub = require('azure-iothub');
const device = require('azure-iot-device');
const amqp = require('azure-iot-device-amqp');
const common = require('azure-iot-common');
const EventEmitter = require('events');
const SAK_CONNECTION_STRING = 'HostName=%s.azure-devices.net;SharedAccessKeyName=%s;SharedAccessKey=%s';
const DEVICE_CONNECTION_STRING = 'HostName=%s.azure-devices.net;DeviceId=%%s;SharedAccessKey=%%s';
const Bridge = class Bridge extends EventEmitter {
constructor(region, appId, accessKey, hubName, keyName, key, options) {
super();
options = options || {};
this._createMessage = options.createMessage || function(deviceId, message) {
const metadata = {
deviceId: deviceId,
time: message.metadata.time,
raw: message.payload_raw.toString('base64')
};
return Object.assign({}, message.payload_fields, metadata);
}
if (!appId || !accessKey || !region || !hubName || !keyName || !key) {
throw new Error('Invalid arguments')
}
this.registry = iothub.Registry.fromConnectionString(util.format(SAK_CONNECTION_STRING, hubName, keyName, key));
this.deviceConnectionString = util.format(DEVICE_CONNECTION_STRING, hubName);
this.devices = {};
this.ttnClient = new ttn.Client(region, appId, accessKey, options);
this.ttnClient.on('connect', super.emit.bind(this, 'ttn-connect'));
this.ttnClient.on('error', super.emit.bind(this, 'error'));
this.ttnClient.on('message', this._handleMessage.bind(this));
}
_getDevice(deviceId) {
if (this.devices[deviceId]) {
return Promise.resolve(this.devices[deviceId]);
}
return new Promise((resolve, reject) => {
const device = new iothub.Device(null);
device.deviceId = deviceId;
this.registry.create(device, (err, deviceInfo) => {
if (!err) {
resolve(deviceInfo);
} else {
// The device probably exists
this.registry.get(device.deviceId, (err, deviceInfo) => {
if (err) {
reject(err);
} else {
resolve(deviceInfo);
}
});
}
});
}).then(deviceInfo => {
const key = deviceInfo.authentication.symmetricKey.primaryKey;
const connectionString = util.format(this.deviceConnectionString, deviceId, key);
const client = amqp.clientFromConnectionString(connectionString);
return new Promise((resolve, reject) => {
client.open(err => {
if (err) {
reject(err);
} else {
this.devices[deviceId] = client;
resolve(client);
}
});
});
});
}
_handleMessage(deviceId, data) {
console.log('%s: Handling message', deviceId);
this._getDevice(deviceId).then(deviceInfo => {
const message = JSON.stringify(this._createMessage(deviceId, data));
deviceInfo.sendEvent(new device.Message(message), (err, res) => {
if (err) {
console.warn('%s: Could not send event: %s. Closing connection', deviceId, err);
deviceInfo.close(err => {
// Delete reference even if close failed
delete this.devices[deviceId];
});
this.emit('error', err);
} else {
this.emit('message', { deviceId: deviceId, message: message });
}
});
})
.catch(err => {
console.log('%s: Could not get device: %s', deviceId, err);
super.emit('error', err);
});
}
}
module.exports = {
Bridge: Bridge
};