/
mhreceiver.js
126 lines (106 loc) · 4.16 KB
/
mhreceiver.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
"use strict";
// require dotenv
const dotenv=require('dotenv');
//var util = require("util");
var cfenv = require("cfenv");
dotenv.config();
// load local VCAP configuration and service credentials
var vcapLocal;
try {
vcapLocal = require('./vcap-local.json');
console.log("Loaded local VCAP");
} catch (e) { }
const appEnvOpts = vcapLocal ? { vcap: vcapLocal} : {}
const appEnv = cfenv.getAppEnv(appEnvOpts);
// Configure Message Hub service using VCAP or ENV properties
var mh_credentials ={};
var mh_configuration ={};
var opts ={};
if (appEnv.services['messagehub'] || appEnv.getService(/messagehub/)) {
mh_credentials = appEnv.services['messagehub'][0].credentials;
opts.brokers = mh_credentials.kafka_brokers_sasl;
opts.username = mh_credentials.user;
opts.password = mh_credentials.password;
console.log("Retrieved Message Hub service credentials from VCAP file");
} else if (process.env.MHBROKERLIST) {
opts.brokers = process.env.MHBROKERLIS;
opts.username = process.env.MHUSERNAME;
opts.password = process.env.MHPASSWORD;
console.log("Retrieved Message Hub service credentials from environment variables")
} else {
console.log("Could not find Message Hub service credentials")
}
var consumer_opts = {
'metadata.broker.list': opts.brokers,
'security.protocol': 'sasl_ssl',
'ssl.ca.location': '/etc/ssl/certs',
'sasl.mechanisms': 'PLAIN',
'sasl.username': opts.username,
'sasl.password': opts.password,
'api.version.request': true,
'security.protocol': 'sasl_ssl',
'client.id': 'kafka-nodejs-console-sample-consumer',
'group.id': 'kafka-nodejs-console-sample-group'
};
// Configure DB service using VCAP or ENV variables
var dbconfiguration;
if (appEnv.services['dashDB'] || appEnv.getService(/dashDB/)) {
dbconfiguration = appEnv.services['dashDB'][0].credentials;
console.log("Retrieved dashDB service credentials from vcap file");
} else if (process.env.DATABASE) {
dbconfiguration = {
db : process.env.DATABASE,
port : process.env.DBPORT,
username : process.env.DBUID,
password : process.env.DBPWD,
hostname: process.env.DBHOSTNAME
};
console.log("Retrieved DB2 service credentials from environment variables")
} else {
console.log("Could not find DB2 service credentials")
}
dbconfiguration.table = process.env.DBTABLE;
dbconfiguration.driver = '{DB2}';
/*require the ibm_db module*/
var ibmdb = require('ibm_db');
var format = require("string-template");
const db2 = require("./ibmdb2interface");
var dbconnection = format ("DRIVER={driver};DATABASE={db};UID={username};PWD={password};HOSTNAME={hostname};port={port}",dbconfiguration);
// Kafka/Message Hub Consumer creation
var Kafka = {};
Kafka = require('node-rdkafka');
var topicName = 'elevator-events';
var deviceType = "Elevator";
// Read from the topic... note that this creates a new stream on each call!
var stream = Kafka.Consumer.createReadStream(consumer_opts, {}, {topics: [topicName]});
console.log('Created Kafka read stream');
// Read data from the stream
stream.on('data', function(message) {
console.log('Got message ' + JSON.stringify(message));
console.log('Received value ' + message.value.toString());
var payload = JSON.parse(message.value.toString());
var realPayload = payload.d;
var deviceId = realPayload.deviceId;
var deviceType = realPayload.deviceType;
if (realPayload.timestamp && deviceId && deviceType) {
var sql_stmt = db2.createSQLMergeStatement(dbconfiguration.table,deviceId,deviceType,realPayload);
db2.executeSQLStatement(dbconnection,deviceId,sql_stmt);
} else {
console.log('Missing properties timestamp, deviceId or deviceType in payload');
}
});
// Handle errors receiving from Kafka
stream.on('error', function (err) {
console.error('Error in kafka stream');
console.error(err);
process.exit();
});
// Listen on port 8000 or Cloud provided Port
// This is only to enable frequent health checking in Containers or CF
var http = require('http');
var server = http.createServer(function (request, response) {
response.writeHead(200, {"Content-Type": "text/plain"});
response.end("Hello World\n");
});
var port = (process.env.PORT || 8000);
server.listen(port);