From 0f4ea22cc0fdc8b85b38a04cb4c6341f5fb0c62e Mon Sep 17 00:00:00 2001 From: Mark Mahoney Date: Fri, 28 Oct 2011 16:16:26 -0700 Subject: [PATCH] updated data source with new handshake and transforms code, updated documentation to hopefully be more clear --- README.md | 111 +++++++++++++++----------- bndry/data-source.js | 168 +++++++++++++++++++++++++++++----------- example-volume-graph.js | 139 +++++++++++++++++---------------- 3 files changed, 263 insertions(+), 155 deletions(-) diff --git a/README.md b/README.md index 66cf2c8..07495a9 100644 --- a/README.md +++ b/README.md @@ -4,11 +4,11 @@ --- -Note: This documentation represents the current status of the Boundary JavaScript API as of October 12, 2012. While we will make a best effort to maintain compatibility and provide advance notice of API changes, all APIs and output formats described below are subject to change. +Note: This documentation represents the current status of the Boundary JavaScript API as of October 28, 2012. While we will make a best effort to maintain compatibility and provide advance notice of API changes, all APIs and output formats described below are subject to change. ## Overview -The Boundary JavaScript API encapsulates the functionality required to authenticate and manage CometD subscriptions to the Boundary Streaming API service. The JavaScript API allows you to retrieve streaming JSON data from various Boundary endpoints, or "data sources," and associate that incoming data with any JavaScript object that implements an "update" method. One common use of this functionality would be to create browser-based visualizations of the data being reported for your organization. +The Boundary JavaScript API encapsulates the functionality required to authenticate and manage CometD subscriptions to the Boundary Streaming API service. The JavaScript API allows you to retrieve streaming JSON data from various Boundary endpoints, or "data sources," and associate that incoming data with any JavaScript object that implements an "update" method. One common use of this functionality would be to create browser-based visualizations of the traffic data being reported for your organization. It is the basis of Boundary's own visualization front end. ## External Requirements @@ -49,79 +49,100 @@ In order for the Boundary JavaScript API to function, the bndry.auth object must In the repo, this is defined in **/bndry/auth.js**. You will need to replace the information in that file with your own account credentials, which may be retrieved from [https://boundary.com/account](https://boundary.com/account). -## Data Sources +## dataSources -Once the auth object is defined, you must include the **/bndry/data-source.js** file, which will create the **bndry.dataSource** method. Calling this method will return a new dataSource instance, which other objects may subscribe to: +A dataSource object manages the connection to a single query in the Boundary Streaming API, and allows subscriber objects to recieve updates from the dataSource as they arrive from the API. It is defined in the **/bndry/data-source.js** file, which will create the **bndry.dataSource** object. Calling this method will return a new dataSource instance, which other objects may subscribe to: - bndry.dataSource(query, [update_interval]) + bndry.dataSource.create(query, [options]) -> dataSource -An optional update interval value may be passed to the dataSource method to force the dataSource to return updates at regular intervals, instead of waiting for updates from the server. This is most useful at the 1 second time resolution, to smooth out updates. +The second, optional argument passed to the dataSource **create** method is an object specifying additional configuration parameters: -An example: +* **updateInterval** - may be passed to the dataSource method to force the dataSource to return updates at regular intervals, instead of waiting for updates from the server. This is most useful at the 1 second time resolution, to smooth out updates +* **forceConnect** - normally, a dataSource will not poll the streaming API for updates unless it has at least one subscriber. This option forces dataSource to poll the API regardless. +* **subscription** - currently only required to subscribe to annotations, in which case it should be set to 'opaque'. - var source = bndry.dataSource('volume_1s', 1000); +To force subscription updates every second, regardless of new data from the API: -## Subscribing to a dataSource + var source = bndry.dataSource.create('volume_1s', { updateInterval: 1000 }); -Once a dataSource instance has been created, any number of arbitrary objects that define an "update" method may subscribe to it. A dataSource defines the following methods. +To subscribe to annotation updates: -### addSubscriber - - datasource.addSubscriber(object) -> subscription_id + var source = bndry.dataSource.create('annotations', { subscription: 'opaque' }); -Registers an object that defines an update method to start recieving data from the Streaming service. It returns a subscription object, which may be used to unsubscribe the object from that dataSource. Speaking of which… - -### removeSubscriber - - datasource.removeSubscriber(subscription_id) +For a list of all available queries in the Streaming API, see the Data Sources section of [the Streaming API Documentation](https://boundary.com/docs#streaming_api). -Deregisters an object from recieving updates from a dataSource. +## dataSource subscribers -#### Example +A dataSource subscriber is any object that has defined an **update** method. Once an object has subscribed to recieve updates from a dataSource, it will begin recieving data objects from the Streaming API via this method. -Subscribing to a total volume per second stream, with an optional forced update interval of one second: +An example of a subscriber that simply log's Streaming API data: - var source = bndry.dataSource('volume_1s', 1000); var logger = { update: function (data) { console.log(data); } }; - var subscription = source.addSubscriber(logger); -## dataSource subscribers +## dataSource updates -Once an object has subscribed to updates from a dataSource, it will begin recieving data objects via it's own update method. Note that a subscriber object **must** implement an update method, and may also implement an optional dataProcessor method. +The required **update** method recieves updates from the dataSource. If a transformation function has not been paired with this subscriber (discussed in the [addSubscriber](#add-subscriber) section in [dataSource Methods](#data-source-methods)), then the update object will have the following fields: -### dataProcessor(data, callback) +* **state** - list of all currently tracked samples for the dataSource's query, keyed by a unique field named **\_\_key\_\_**, which is a composite of the sample's other fields concatenated with the ':' character +* **added** - list of samples added to the state since the last update +* **removed** - list of samples removed from the state since the last update -If defined, the dataSource will call this method to process the raw subscription data into a different format. It must be returned via the supplied callback before it can be handed off to the object's update method. + +## dataSource Methods -An example: - - var source = bndry.dataSource('volume_1s', 1000); +Once a dataSource instance has been created, any number of subscription objects may subscribe to it for updates, unsubscribe from updates, request the most recent data recieved from the Streaming API, or update their optional transformation functions. + + +### addSubscriber + +Registers a subscriber object to start recieving data from the Streaming service. Calling this method returns a subscription identifier, which may be used to update or remove the subscription later. + + datasource.addSubscriber(subscriber_object, [transformation]) -> subscription_id + +An optional **transformation** function may be provided to process the data before it is handed off to the subscriber's **update** method. The transformation function is any function that takes a data object and a callback as input, and passes it's processed data to the supplied callback method when it is finished. This allows for potentially asynchronous data processing, such as using web workers. + + var transformation = function (data, callback) { + var transformed_data = {}; + + … map incoming data to the transformed_data set … + + callback(transformed_data); + } + +The **addSubscriber** method returns a subscription identifier, which may be used to unsubscribe the object from it's dataSource at any time. + +Example: subscribing to a total volume per second stream, with an optional forced update interval of one second: + + var source = bndry.dataSource.create('volume_1s', { updateInterval: 1000 }); var logger = { - dataProcessor: function (data, callback) { - var ingressOctets = []; - for (s in data.state) { - ingressOctets.push(data.state[s].ingressOctetTotalCount); - } - }, - update: function (octetList) { - console.log(octetList); + update: function (data) { + console.log(data); } }; var subscription = source.addSubscriber(logger); + +### updateTransformation + +Updates the transformation function associated with a particular subscriber. + + datasource.updateTransformation(subscription_id, transform_function) + +Calling this method will immediately update the subscriber associated with this subscription_id, using the new transformation function. + +### removeSubscriber +Decouples a subscription object from a dataSource. -### update(data) + datasource.removeSubscriber(subscription_id) -This required method recieves updates from the dataSource. If a dataProcessor method was not defined, then the format of this object is described below. Otherwise, the format of will be defined by the dataProcessor method. +Onced called, the subscription object associated with that subscription_id will no long recieve updates from the dataSource instance. -## The data object +### lastUpdate -Updates from the Streaming API are processed by the dataSource and delivered to subscribers in the following format: +Returns the last data object sent to all subscribers. -* **state** - list of all currently tracked samples, categorized by a unique field named \_\_key\_\_ -* **added** - list of samples added to the state set since the last update -* **removed** - list of samples removed from the state set since the last update + datasource.lastUpdate() -> unprocessed data object \ No newline at end of file diff --git a/bndry/data-source.js b/bndry/data-source.js index 1ff28ab..7642702 100644 --- a/bndry/data-source.js +++ b/bndry/data-source.js @@ -4,12 +4,17 @@ if (!console) { var console = { log: function () {} }; } (function (bndry, $, undefined) { var uid = 0; + // set to true for handshake and subscription logging + bndry.debug = true; + if (!bndry.auth) { - throw new Error('Auth credentials not defined'); + throw new Error('Boundary auth credentials not defined'); return; } - var struct = { + // struct utility + bndry.utils = bndry.utils || {}; + bndry.utils.struct = bndry.utils.struct || { pack: function (objects) { var schema = [], // holds the schema to include in the compressed output data = [], // holds a list of all compressed data elements encoded using schema @@ -94,26 +99,41 @@ if (!console) { var console = { log: function () {} }; } return objects; } }; - + + // handles communication with streaming API var streakerClient = (function () { var auth = bndry.auth, cometdEndpoint = auth.cometd, org_id = auth.org_id, user = auth.user, apikey = auth.apikey, - started = false; - $.cometd.configure(cometdEndpoint); + var subscriptions = {}, + uid = 0, + id; - $.cometd.addListener("/meta/subscribe", function (message) { - console.log("subscribed"); - }); + $.cometd.configure(cometdEndpoint); $.cometd.addListener("/meta/handshake", function (message) { if (message.successful) { - console.log("handshake - " + message.clientId); + if (bndry.debug) { + console.log("handshake - " + message.clientId); + } + started = true; + + // check for any pre-existing subscriptions (maybe we got disconnected and need to resubscribe) + for (id in subscriptions) { + $.cometd.unsubscribe(subscriptions[id].subscription); + subscriptions[id].subscription = $.cometd.subscribe(subscriptions[id].query, subscriptions[id].handler); + } + } + }); + + $.cometd.addListener("/meta/subscribe", function (message) { + if (bndry.debug) { + console.log("subscribed - " + message.subscription); } }); @@ -134,34 +154,51 @@ if (!console) { var console = { log: function () {} }; } servertime: function () { return $.cometd.timesync.getServerTime(); }, + subscribe: function (query, handler, options) { - query = options.subscriber == 'opaque' ? "/opaque/" + org_id + "/" + query : "/query/" + org_id + "/" + query; - return $.cometd.subscribe(query, handler); + var subscription; + + query = options.subscriber == 'opaque' ? + "/opaque/" + org_id + "/" + query : + "/query/" + org_id + "/" + query; + subscription = $.cometd.subscribe(query, handler); + + subscriptions[++uid] = { + subscription: subscription, + handler: handler, + query: query + }; + + return uid; }, - unsubscribe: function (subscription) { - $.cometd.unsubscribe(subscription); + + unsubscribe: function (id) { + $.cometd.unsubscribe(subscriptions[id].subscription); + delete subscriptions[id]; }, + isStarted: function () { return started; } }; })(); - var dataSource = function (query, updateInterval, options) { + var dataSource = function (query, options) { var subscribers = {}, subscriberCount = 0, streakerID = null, - lastData = null; + lastData = {}; // constants var WAITING = -1; + // stores the overall state of all updates based on inserts and removes var state = (function () { var current = {}; return { get: function () { return current; }, - + update: function (added, removed) { var i, len; @@ -179,13 +216,13 @@ if (!console) { var console = { log: function () {} }; } })(); var expandData = (function () { - var schema, // holds a mapping of array offsets to field names; use this to expand compressed objects before emitting to consumers + var schema, // holds a mapping of array offsets to field names keys; // list of field names that make up unique key function expand(source) { var out = []; - out = struct.unpack({ + out = bndry.utils.struct.unpack({ schema: schema, keys: keys, data: source @@ -224,26 +261,27 @@ if (!console) { var console = { log: function () {} }; } }; })(); - function update(data) { - var subscriber, s; - - function postUpdate(s) { - return function (data) { - s.update(data); - }; + // notify a subscriber of new data + function updateSubscriber(subscriber, data) { + if (subscriber.transform) { + subscriber.transform(data, subscriber.update); + } else { + subscriber.update(data); } + } + + // loop through subscribers and notify them of new data + function update(data) { + var s; - if (updateInterval || data.added || data.removed) { + if (options.updateInterval || data.added || data.removed) { data.state = state.update(data.added, data.removed); for (s in subscribers) { - subscriber = subscribers[s]; - - // see if this subscriber requested the data be processed in a specific way - if (subscriber.dataProcessor && typeof subscriber.dataProcessor === 'function') { - subscriber.dataProcessor(data, postUpdate(subscriber)); - } else { - postUpdate(subscriber)(data); + try { + updateSubscriber(subscribers[s], data); + } catch (e) { + throw(e); } } @@ -251,20 +289,21 @@ if (!console) { var console = { log: function () {} }; } } } + // update because we got new data from streaker function streakerUpdate(msg) { - var data = expandData(msg); - - update(data); + update(expandData(msg)); } + // update every options.updateInterval milliseconds, regardless of updates from streaker var intervalUpdate = (function () { var timer, updates = {}; - + return function (msg) { var data = expandData(msg), d; + // until the timer triggers, cache any updates locally for (d in data) { if (!updates[d]) { updates[d] = data[d]; @@ -277,15 +316,17 @@ if (!console) { var console = { log: function () {} }; } timer = window.setInterval(function () { update(updates); updates = {}; - }, updateInterval); + }, options.updateInterval); } }; })(); function connect() { - if (!streakerID || streakerID === WAITING) { + if (streakerID === null || streakerID === WAITING) { if (streakerClient.isStarted()) { - streakerID = streakerClient.subscribe(query, updateInterval ? intervalUpdate : streakerUpdate, options || {}); + streakerID = streakerClient.subscribe(query, + options.updateInterval ? + intervalUpdate : streakerUpdate, options || {}); } else { streakerID = WAITING; window.setTimeout(connect, 100); @@ -294,25 +335,48 @@ if (!console) { var console = { log: function () {} }; } } function disconnect() { - if (streakerID) { + if (streakerID !== null) { if (streakerID === WAITING) { window.setTimeout(disconnect, 100); } else { streakerClient.unsubscribe(streakerID); + streakerID = null; + lastData = {}; } } } + // initialize instance + options = options || {}; + if (options.forceConnect === true) { + connect(); + } + + // instance interface return { - addSubscriber: function (subscriber) { + addSubscriber: function (subscriber, transformation) { + // make sure "this" is bound to the subscriber's instance, in case that matters to the subscriber + function bind(o, f) { + return function (data) { + f.call(o, data); + }; + } + + // connect if we haven't already if (!streakerID) { connect(); } - subscribers[++uid] = subscriber; + // update the subscriber list with the new info + subscribers[++uid] = { + update: bind(subscriber, subscriber.update), + transform: (transformation && typeof transformation === 'function' ? transformation : null) + }; + subscriberCount++; + // return the id of this subscription return uid; }, @@ -320,13 +384,25 @@ if (!console) { var console = { log: function () {} }; } delete subscribers[id]; --subscriberCount; - if (subscriberCount === 0) { + // see if we want to disconnect from streaker + if (subscriberCount === 0 && !options.forceConnect) { disconnect(); } }, + // update a data transformation function for a subscription (pass null to remove transformation) + updateTransformation: function (id, transformation) { + if (subscribers[id]) { + subscribers[id].transform = (transformation && + typeof transformation === 'function' ? transformation : null); + + // run an immediate update for this subscriber using the last update from streaker + udpateSubscriber(subscribers[id], lastData); + } + }, + lastUpdate: function () { - return lastData || {}; + return lastData; } }; }; @@ -336,3 +412,5 @@ if (!console) { var console = { log: function () {} }; } bndry.dataSource.create = dataSource; })(bndry, jQuery); + + diff --git a/example-volume-graph.js b/example-volume-graph.js index bfdf616..1b2a496 100644 --- a/example-volume-graph.js +++ b/example-volume-graph.js @@ -1,103 +1,112 @@ window.onload = function () { - var subscribeBtn = document.getElementById('subscribe'), - unsubscribeBtn = document.getElementById('unsubscribe'), - canvas = document.getElementById('graph'); - - var volumeData = bndry.dataSource('volume_1s', 1000), + + // create the data source, passing an optional value to force updates every second, regardless of new data + var volumeData = bndry.dataSource.create('volume_1s', { updateInterval: 1000 }), subscription = null; function subscribe() { - subscription = volumeData.addSubscriber(graph); - subscribeBtn.disabled = true; - unsubscribeBtn.disabled = false; + // subscribe to the dataSource, passing in an optional data transformation function + // to normalize the data for graphing + subscription = volumeData.addSubscriber(graph, normalizeOctets); + + document.getElementById('subscribe').disabled = true; + document.getElementById('unsubscribe').disabled = false; } function unsubscribe() { + // unsubscribe from the data source using the subscription id we stored earlier volumeData.removeSubscriber(subscription); + subscription = null; - subscribeBtn.disabled = false; - unsubscribeBtn.disabled = true; + document.getElementById('subscribe').disabled = false; + document.getElementById('unsubscribe').disabled = true; } - subscribeBtn.onclick = subscribe; - unsubscribeBtn.onclick = unsubscribe; + document.getElementById('subscribe').onclick = subscribe; + document.getElementById('unsubscribe').onclick = unsubscribe; /* - The graph object will subscribe to a dataSource. A dataSource subscriber has one required field, update. - An optional method, dataProcessor, may be defined to transform the data to be recieved by update before - update is called. - */ - var graph = { - /* - Processes the state field into a more useful form for drawing a bar graph over a time interval. - For the volume_1s query, the state key is the timestamp of the sample. + Processes the state field into a more useful form for drawing a bar graph over a time interval. + For the volume_1s query, the state key is the timestamp of the sample. - The callback will return the data to the dataSource for handoff to the update method. - */ - dataProcessor: function (data, callback) { - var maxIngressValue = 0, - sortedIngressValues = [], - sampleRange = 600, - currentTime = Math.floor((new Date()).getTime() / 1000) * 1000, - timestamp; + The callback will return the data to the dataSource for handoff to the subscriber. + */ + function normalizeOctets (data, callback) { + var maxIngressValue = 0, + sortedIngressValues = [], + sampleRange = 600, + currentTime = Math.floor((new Date()).getTime() / 1000) * 1000, + timestamp, + i; - // step over each second backwards, and see if there is an ingress octet value to store - for (timestamp = currentTime - (1000 * sampleRange); timestamp < currentTime; timestamp += 1000) { - if (data.state[timestamp]) { - sortedIngressValues.push(data.state[timestamp].ingressOctetTotalCount); - maxIngressValue = Math.max(data.state[timestamp].ingressOctetTotalCount, maxIngressValue); - } else { - sortedIngressValues.push(0); - } + // step over each second backwards, and see if there is an ingress octet value to store + for (timestamp = currentTime - (1000 * sampleRange); timestamp <= currentTime; timestamp += 1000) { + if (data.state[timestamp]) { + sortedIngressValues.push(data.state[timestamp].ingressOctetTotalCount); + maxIngressValue = Math.max(data.state[timestamp].ingressOctetTotalCount, maxIngressValue); + } else { + sortedIngressValues.push(0); } + } - // return the processed data to dataSource for handoff to update - data = { - maxIngressValue: maxIngressValue, - times: sortedIngressValues - }; - - callback(data); - }, - - /* - Called by the dataSource to refresh the graph; note that if we did not define a dataProcessor method to - customize our data, data would consist of three fields: + // loop over the data again and scale it by the maximal octet value, + // so each ingress value is mapped to a range of [0, 1] + for (i = 0; i <= sampleRange; ++i) { + sortedIngressValues[i] /= maxIngressValue; + } - data.added: items that have been added to the data set since the last update, stored by key; undefined if none - data.removed: items that have been removed from the data set since the last update, stored by key; undefined if none - data.state: all items currently being reported, stored by key + // return the processed, time-ordered data for handoff to dataSource subscribers + callback(sortedIngressValues.reverse()); + } + + /* + The graph object will subscribe to a dataSource. A dataSource subscriber has one + required field, update. Without a transformation provided with the object's dataSource + subscription, the data argument passed to update would look like the following: - Since we have defined a dataProcessor method to smooth out and order our data per second, we have the - following fields available: + data = { + added: items that have been added to the data set since the last update, + stored by key; undefined if none + removed: items that have been removed from the data set since the last update, + stored by key; undefined if none + state: all items currently being reported from the Streaming API for this query, + stored by key + } - data.times: an array of octet values arranged in ascending order, starting with the current second - data.maxIngressValue: the maximal ingress octect value in data.items - */ + However, since we've defined a transformation function to smooth out, normalize, and order + the data being recieved by this subscriber, our update method will recieve an array of + numbers scaled from 0 to 1, representing the relative height of each sample in the graph. + */ + var graph = { + canvas: document.getElementById('graph'), + update: function (data) { - var width = canvas.width, - height = canvas.height, - context = canvas.getContext('2d'), - max = data.maxIngressValue, + var width = this.canvas.width, + height = this.canvas.height, + context = this.canvas.getContext('2d'), i = 0, - samples = data.times.length, + samples = data.length, lineWidth = width / samples, - val; + sample, + sampleHeight; context.clearRect(0, 0, width, height); context.fillStyle = '#555'; // loop over the samples and render them to scale in the graph for (; i < samples; ++i) { - val = data.times[i]; + sample = data[i]; + sampleHeight = sample * height; + context.fillRect(width - (i + 1) * lineWidth, - height - val * height / max, + height - sampleHeight, lineWidth, - val * height / max); + sampleHeight); } } }; // kick everything off by subscribing the graph to the datasource subscribe(); + }; \ No newline at end of file