Skip to content
This repository has been archived by the owner on Mar 7, 2018. It is now read-only.

Commit

Permalink
Merge pull request #15 from CatalystCode/split-out-eventhub-client
Browse files Browse the repository at this point in the history
Split out client to send EventHub messages
  • Loading branch information
c-w committed Jun 29, 2017
2 parents f522345 + dbdb0e8 commit 84797a0
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 30 deletions.
36 changes: 36 additions & 0 deletions src/eventHubClient/EventHubSender.js
@@ -0,0 +1,36 @@
const EventHubClient = require('azure-event-hubs').Client;

const eventHubConnectionString = process.env.PUBLISH_EVENTS_EVENTHUB_CONNECTION_STRING;
const eventHubPath = process.env.PUBLISH_EVENTS_EVENTHUB_PATH;
const eventHubPartition = process.env.PUBLISH_EVENTS_EVENTHUB_PARTITION;
const eventHubClient = EventHubClient.fromConnectionString(eventHubConnectionString, eventHubPath);

function sendMessages(messages) {
return new Promise((resolve, reject) => {
if (!messages || !messages.length) {
reject('No messages to be sent');
return;
}

let payloads;
try {
payloads = messages.map(message => ({contents: JSON.stringify(message)}));
} catch (err) {
reject(`Unable to create payloads for EventHub: ${err}`);
return;
}

eventHubClient.open()
.then(() => eventHubClient.createSender())
.then(eventHubSender => {
eventHubSender.on('errorReceived', err => reject(`Error talking to EventHub: ${err}`));
Promise.all(payloads.map(payload => eventHubSender.send(payload, eventHubPartition)))
.then(() => resolve([]))
.catch((err) => reject(`Error sending EventHub message: ${err}`));
});
});
}

module.exports = {
sendMessages: sendMessages
};
34 changes: 4 additions & 30 deletions src/resolvers-cassandra/Messages.js
@@ -1,42 +1,16 @@
'use strict';

const EventHubClient = require('azure-event-hubs').Client;
const translatorService = require('../translatorClient/MsftTranslator');

const eventHubConnectionString = process.env.PUBLISH_EVENTS_EVENTHUB_CONNECTION_STRING;
const eventHubPath = process.env.PUBLISH_EVENTS_EVENTHUB_PATH;
const eventHubPartition = process.env.PUBLISH_EVENTS_EVENTHUB_PARTITION;
const eventHubClient = EventHubClient.fromConnectionString(eventHubConnectionString, eventHubPath);
const eventHubSender = require('../eventhubClient/EventHubSender');

module.exports = {
// ---------------------------------------------------------------------------------- mutations
// ---------------------------------------------------------------------------------- mutations

publishEvents(args, res){ // eslint-disable-line no-unused-vars
return new Promise((resolve, reject) => {
const events = args && args.input && args.input.messages;
if (!events) {
reject(`No messages to be sent in request: ${JSON.stringify(args)}`);
}

let messages;
try {
messages = events.map(event => ({contents: JSON.stringify(event)}));
} catch (err) {
reject(`Unable to create payloads for EventHub: ${err}`);
}

eventHubClient.open()
.then(() => eventHubClient.createSender())
.then(eventHubSender => {
eventHubSender.on('errorReceived', err => reject(`Error talking to EventHub: ${err}`));
Promise.all(messages.map(message => eventHubSender.send(message, eventHubPartition)))
.then(() => resolve([]))
.catch((err) => reject(`Error sending EventHub message: ${err}`));
});
});
return eventHubSender.sendMessages(args && args.input && args.input.messages);
},

// ------------------------------------------------------------------------------------ queries
// ------------------------------------------------------------------------------------ queries

byBbox(args, res){ // eslint-disable-line no-unused-vars
},
Expand Down

0 comments on commit 84797a0

Please sign in to comment.