Skip to content
Browse files

got dataSource caught up with production-side revisions, updated docu…

…mentation
  • Loading branch information...
1 parent 3e40ca7 commit cd72dc7bf28b5f098dcb9acb8e23a421cf86eab7 @markmahoney markmahoney committed Mar 22, 2012
Showing with 450 additions and 91 deletions.
  1. +124 −23 README.md
  2. +326 −68 bndry/data-source.js
View
147 README.md
@@ -4,19 +4,19 @@
---
-Note: This documentation represents the current status of the Boundary JavaScript API as of December 28, 2011. While we will make a best effort to maintain compatibility and provide advance notice of API changes, all APIs and output formats described below are subject to change.
+Note: This documentation represents the current status of the Boundary JavaScript API as of March 22, 2012. While we will make a best effort to maintain compatibility and provide advance notice of API changes, all APIs and output formats described below are subject to change.
## Overview
-The Boundary JavaScript API manages authentication and streaming data subscriptions for the Boundary Streaming API service. The API allows you to retrieve streaming updates relating to various aspects of your Boundary-monitored network in JSON format using a set of [predefined queries](https://app.boundary.com/docs#streaming_api). One common use of this functionality would be to create browser-based visualizations of the traffic data being reported for your organization. It is the basis of Boundary's own visualization front end.
+The Boundary JavaScript API manages authentication and streaming data subscriptions for the Boundary Streaming API service. The API allows you to retrieve streaming updates relating to various aspects of your Boundary-monitored network in JSON format using a set of [predefined queries](https://app.boundary.com/docs/streaming_api#datasources). One common use of this functionality would be to create browser-based visualizations of the traffic data being reported for your organization. It is the basis of Boundary's own visualization front end.
## External Requirements
The following files, supplied in the repository, are required to run the JavaScript API:
* **/lib/jquery-1.6.4.min.js** - jQuery (any recent version will do)
* **/lib/org/cometd.js** - CometD
-* **/lib/org/cometd/TimeSyncExtension.j**s - TimeSyncExtension for CometD
+* **/lib/org/cometd/TimeSyncExtension.js** - TimeSyncExtension for CometD
* **/lib/jquery.cometd.js** and **lib/jquery.cometd-timesync.js** - jQuery CometD bindings
## API Files
@@ -48,40 +48,99 @@ In order for the Boundary JavaScript API to function, the bndry.auth object must
In the repo, this is defined in **/bndry/auth.js**. You will need to replace the information in that file with your own account credentials, which may be retrieved from [https://app.boundary.com/account](https://app.boundary.com/account).
+<a id="data-source-creation"></a>
## dataSources
-A dataSource object manages the connection to a single query in the Boundary Streaming API, and allows subscribers to receive updates from the dataSource as they arrive from the API. It is defined in the **/bndry/data-source.js** file, which will create the **bndry.dataSource** object. Calling the **create** method of this object will return a new dataSource instance, which you may subscribe to:
+A dataSource object manages the connection to a single query in the [Boundary Streaming API](https://app.boundary.com/docs/streaming_api), and allows subscribers to receive updates from the dataSource as they arrive from the API. It is defined in the **/bndry/data-source.js** file, which instantiates the **bndry.dataSource** object. Calling the **create** method of this object will return a new dataSource instance, which you may then [subscribe](#add-subscriber) to:
bndry.dataSource.create(query, [options]) -> dataSource
The second, optional argument passed to the dataSource **create** method defines additional configuration parameters:
-* **forceConnect** - normally, a dataSource will not poll the streaming API for updates unless it has at least one subscriber. This option forces dataSource to poll the API regardless.
-* **subscription** - currently only required to subscribe to annotations, in which case it should be set to 'opaque'.
+* **aggregate** - limit the incoming query data to a set of meters (see [Meter Aggregation](#meter-aggregation) below)
+* **filter** - filter the incoming query data along several possible axes (see [Filtering](#query-filtering) below)
+* **forceConnect** - normally, a dataSource will not poll the streaming API for updates unless it has at least one subscriber; when set to **true**, this option forces dataSource to poll the API regardless
+* **subscription** - currently only required to subscribe to annotations, in which case it should be set to 'opaque'
-An annotation query example:
+An example dataSource for traffic volume data tracked every second by all meters in your organization, filtered to traffic to or from the US and aggregated by meters with particular observation domain IDs:
+
+ var source = bndry.dataSource.create(
+ 'volume_1s_meter',
+ {
+ aggregate: [4, 7, 34],
+ filter: {
+ country: ["US"]
+ }
+ }
+ );
+
+A simple annotation query example:
var source = bndry.dataSource.create('annotations', { subscription: 'opaque' });
-For a list of all available queries in the Streaming API, see the Data Sources section of [the Streaming API Documentation](https://app.boundary.com/docs#streaming_api).
+For a list of all available queries in the Streaming API, see the **Data Sources** section of [the Streaming API Documentation](https://app.boundary.com/docs/streaming_api#datasources).
+
+<a id="meter-aggregation"></a>
+### Meter Aggregation
+Normally, a dataSource query will return data for all meters in your organization. If you would like to see data for only a subset of your meters, set the **aggregate** field in the options parameter to an array of observation domain IDs for those meters.
+
+Each meter in your organization has a unique observation domain ID, represented by an integer value. One way to get a list of observation domain IDs and the meters they belong to is to use the Search API:
+
+ curl -u <your api key> \
+ https://api.boundary.com/<your organization id>/search?types=meter
+
+This will return a JSON blob with an "entities" field containing an entry for every meter in your organization.
+
+Another way to obtain meter observation domain IDs for your organization to subscribe to the "meter_status" query via dataSource. The first update will return a list of all meters in your organization, along with their observation domain IDs and many other fields.
+
+Once you have a list of observation domain IDs, you can limit a query's data to a subset of your organization's meters:
+
+ {
+ aggregate: [4, 7, 34]
+ }
+
+<a id="query-filtering"></a>
+### Filtering
+dataSource queries are filterable along several axes:
+
+* **country** - limit the data to packets coming from or going to a set of countries ([valid country codes](http://www.maxmind.com/app/iso3166))
+* **transport** - limit data to packets coming from or going to a set of port:protocol pairs, listed as decimal values ([valid decimal protocol values](http://www.iana.org/assignments/protocol-numbers/protocol-numbers.xml))
+* **ip addresses** - limit data to packets coming from or going to a set of ip addresses
+
+A sample filter, which would only show traffic to or from the US or Russia, to ports 80:TCP or 4047:TCP, with a source or destination IP address of 4.2.2.1:
+
+ {
+ filter: {
+ country: ["US", "RU"],
+ transport: [{ port: 80, protocol: 6 }, { port: 4047, protocol: 6 }],
+ ips: ["4.2.2.1"]
+ }
+ }
+
+**Important:** filtered queries are short-lived; filtering a query results in a new, temporary query being generated behind-the-scenes inside the Streaming API. As such, all filtered queries will initially appear empty, and will begin accumulating historic data from that moment on. When all subscribers to that query have disconnected, the filtered query and its associated history will be destroyed. Additionally, a filtered query may initially trigger reciept of a **possiblyNoMeterData** update field (see [dataSource updates](data-source-updates) for a description of that field).
<a id="data-source-methods"></a>
## dataSource Methods
-Once a dataSource instance has been created, any number of subscriber callback functions may be added to receive streaming updates. You may also unsubscribe callback functions from receiving updates, request the most recent update received from the Streaming API, or update the optional transformation function for a particular subscriber.
+* [addSubscriber](#add-subscriber)
+* [removeSubscriber](#remove-subscriber)
+* [updateQuery](#update-query)
+* [updateTransformation](#update-transformation)
+* [lastUpdate](#last-update)
+* [reconnect](#reconnect)
<a id="add-subscriber"></a>
### addSubscriber
Registers a callback function to receive data from the Streaming API service. Calling this method returns a subscription identifier, which may be used to update or remove the subscribed callback later.
- datasource.addSubscriber(subscriber_callback, [transformation]) -> subscription_id
+ source.addSubscriber(subscriber_callback, [transformation]) -> subscription_id
**subscriber_callback** is a function that receives a single argument, an object containing updates to the current recordset for your query, as well as the overall state of the query itself. See [dataSource Updates](#data-source-updates) for more information.
- var datasource = bndry.dataSource.create('volume_1s');
+ var source = bndry.dataSource.create('volume_1s');
- datasource.addSubscriber(function (data) {
+ source.addSubscriber(function (data) {
console.log(data);
});
@@ -106,41 +165,83 @@ Example: subscribing to updates from a total volume per second stream:
};
var subscription = source.addSubscriber(logger);
+
+<a id="remove-subscriber"></a>
+### removeSubscriber
+
+Decouples a subscriber from a dataSource.
+
+ source.removeSubscriber(subscription_id)
+
+Once called, the subscriber associated with that subscription_id will no long receive updates from the dataSource instance.
+
+When a dataSource has no subscribers, it will automatically disconnect from the Streaming API, unless the **forceConnect** option has been set.
+
+<a id="update-query"></a>
+### updateQuery
+
+Reconfigures an existing dataSource's query and options, using the same set of paramters as dataSource instantiation:
+
+ source.updateQuery(query, [options])
+
+Calling this method will notify all subscribers of a pending reconnection, and then will reconnect to the Streaming API with the updated parameters.
+
+See the [dataSource section](#data-source-creation) for parameter details.
+<a id="update-transformation"></a>
### updateTransformation
Updates the optional transformation function associated with a particular subscription.
- datasource.updateTransformation(subscription_id, transform_function)
+ source.updateTransformation(subscription_id, transform_function)
Calling this method will immediately update the subscriber associated with this subscription_id, using the new transformation function.
-### removeSubscriber
-
-Decouples a subscriber from a dataSource.
+<a id="last-update"></a>
+### lastUpdate
- datasource.removeSubscriber(subscription_id)
+Returns the last, untransformed data object sent to all subscribers:
-Once called, the subscriber associated with that subscription_id will no long receive updates from the dataSource instance.
+ source.lastUpdate() -> update object
+
+<a id="reconnect"></a>
+### reconnect
-### lastUpdate
+Performs a disconnection and subsequent reconnection to the Streaming API.
+
+ source.reconnect(true);
-Returns the last data object sent to all subscribers.
+Pass **true** to this method to notify all subscribers that a reconnection is about to occur. Each subscriber's update function will be called and passed an object with the field **reconnecting** set to **true**. It is important for subscribers to be aware of reconnections, as a reconnection will result in a new state dump from the Streaming API, which can invalidate any previous query data subscribers may retain.
- datasource.lastUpdate() -> unprocessed data object
+This method is primarily used internally when dataSource needs to reset the connection to the Streaming API, usually due to a polling time-out.
<a id="data-source-updates"></a>
## dataSource updates
-If a transformation function has not been paired with your subscription (discussed in the [addSubscriber](#add-subscriber) section in [dataSource Methods](#data-source-methods)), then the update objects received by the subscriber_callback function will contain the following fields:
+If a transformation function has not been paired with your subscription (discussed in the [addSubscriber](#add-subscriber) section in [dataSource Methods](#data-source-methods)), then the update object received by the subscriber_callback function will usually contain one or more of the following fields:
-* **state** - list of all currently tracked samples for the dataSource's query, keyed by a unique field named **\_\_key\_\_**, which is a composite of the sample's other fields concatenated with the ':' character
+* **state** - list of all currently tracked records for the dataSource's query, keyed by the field **\_\_key\_\_**, which is composed of the record's other uniquely identifying fields joined with the ':' character (for a time-based port:protocol query, the **\_\_key\_\_** would be **[record's epochal time]:[port value]:[protocol value]**)
* **added** - list of records added to the state since the last update
* **removed** - list of records removed from the state since the last update
+**Important:** When processing updates in a subscriber, always apply the **removed** list, then the **added** list. This ensures that all events which have left the present query are properly removed from your local state before new records are added. If events that you’ve already seen are being updated, you wouldn’t want to remove the data you’ve just added! This is the same process used to maintain the **state** field every update.
+
+There are also two special-case fields that may be included in the parameter passed to subscribers:
+
+* **possiblyNoMeterData** - dataSource uses a set of heuristics to determine when the Streaming API may not have any data to show for your current query. When it detects this state, this field will be set to **true**. This may happen because the meters in the organization are not properly reporting data to Boundary's collectors, or because a combination of aggregation and filtering has constrained the query to an empty set for a particular time resolution. This field is only a guess; you may eventually start recieving proper updates from the Streaming API even after receiving this flag, as will often happen with newly-filtered queries.
+* **reconnecting** - when this field is set to **true**, dataSource is about to reconnect to the Streaming API, either due to updated dataSource options or because connectivity with the Streaming API was lost. Any internal state data stored by the subscriber should be discarded in anticipation of new data in the next update.
+
---
### Recent updates to the Boundary Javascript API:
+March 22, 2012:
+
+* Added aggregate and filter options to dataSource creation
+* Added updateQuery and reconnect methods
+* Two new possible fields in parameter passed to dataSource subscribers: possiblyNoMeterData and reconnecting
+* Several bug fixes
+* Fixed some typos, probably added some new ones
+
December 28, 2011:
* Removed optional update interval smoothing from dataSource creation
View
394 bndry/data-source.js
@@ -3,17 +3,13 @@ if (!console) { var console = { log: function () {} }; }
(function (bndry, $, undefined) {
var uid = 0,
- activeDataSources = [];
+ activeDataSources = [],
+ transientFilterIDs = {};
// set to true for handshake and subscription logging
bndry.debug = false;
- if (!bndry.auth) {
- throw new Error('Boundary auth credentials not defined');
- return;
- }
-
- // struct utility
+ // struct utility (defined in utils.js but included here as well for easier maintenance of the client-facing repo)
bndry.utils = bndry.utils || {};
bndry.utils.struct = bndry.utils.struct || {
pack: function (objects) {
@@ -123,9 +119,12 @@ if (!console) { var console = { log: function () {} }; }
}
if (started) {
+ // reset any cached transient fp ids, in case those have timed out between handshakes
+ requestFilteredQueryChannel.clearCache();
+
// check for any pre-existing subscriptions (maybe we got disconnected and need to resubscribe)
for (i = 0, len = activeDataSources.length; i < len; ++i) {
- activeDataSources[i].reconnect();
+ activeDataSources[i].reconnect(true);
}
} else {
started = true;
@@ -218,35 +217,106 @@ if (!console) { var console = { log: function () {} }; }
};
};
- // get a filtered/aggregated query channel to use with a datasource instance
- var requestFilteredQueryChannel = (function () {
+
+ // generate a unique, consistent MD5 value from a transient flow profile filter, regardless of field orderings;
+ // used to cache requests to generate transient flow profile IDs
+ var filterObjectToMD5 = (function () {
+ var sorter = {
+ meters: undefined, // default sort is fine
+ country: undefined, // default sort is fine
+ transport: function (a, b) {
+ var order = 0;
+
+ // order first by port, then by protocol
+ if (a.port < b.port) {
+ order = -1;
+ } else if (a.port == b.port) {
+ if (a.protocol < b.protocol) {
+ order = -1;
+ } else if (a.protocol > b.protocol) {
+ order = 1;
+ }
+ } else {
+ order = 1;
+ }
+
+ return order;
+ }
+ };
+
+ var process = {
+ meters: function (o) { return o.sort(sorter['meters']); },
+ country: function (o) { return o.sort(sorter['country']); },
+ transport: function (o) {
+ var values = o.sort(sorter['transport']),
+ ordered = [],
+ i;
+
+ for (i = 0; i < values.length; ++i) {
+ ordered.push('port');
+ ordered.push(o[i].port);
+ ordered.push('protocol');
+ ordered.push(o[i].protocol);
+ }
+
+ return ordered;
+ },
+ _default: function (o) {
+ if (o.sort && typeof o.sort === 'function') {
+ o.sort();
+ }
+
+ return o;
+ }
+ };
+
+ // { meters: [4, 7], transport: [{ port: 4740, protocol: 6 }], country: ['US', 'CA'] } ==>
+ // ['meters', [4, 7], 'transport', ['port', 4720, 'protocol', 6], 'country', ['US', 'CA']]
+ return function (obj) {
+ var keys = [], key,
+ fields = [],
+ i;
+
+ // get all filter keys in sorted order
+ for (key in obj) {
+ keys.push(key);
+ }
+ keys.sort();
+
+ // now create an array of [key, processedValue, key, processedValue ...] ordered pairs
+ for (i = 0; i < keys.length; ++i) {
+ key = keys[i];
+
+ fields.push(key);
+ fields.push((process[key] || process._default)(obj[key]));
+ }
+
+ return bndry.utils.md5(JSON.stringify(fields));
+ };
+ })();
+
+ // get a meter-aggregated query channel to use with a datasource instance
+ var requestAggregatedQueryChannel = (function () {
var id = 0,
- filterQueryRequests = {},
+ aggregateQueryRequests = {},
initialized = false;
function responder(response) {
var data = response.data || {};
if (data.result === 'success') {
- if (filterQueryRequests[data.id]) {
+ if (aggregateQueryRequests[data.id]) {
// return the channel name to the callback assigned this id
- filterQueryRequests[data.id](data.location);
- delete filterQueryRequests[data.id];
+ aggregateQueryRequests[data.id](data.location);
+ delete aggregateQueryRequests[data.id];
}
} else {
throw(new Error(data.message));
}
}
- /*
- filter = {
- name: filter name to apply to the query (currently only 'filter_by_meters'),
- values: list of key:value pairs to pass to the filter service
- }
- */
- return function (query, filter, callback) {
- var msg = { ext: {} },
- o;
+ return function (query, meters, callback) {
+ var msg = { ext: {} };
// we have to make sure the handshake has happened before we do anything
if (!initialized) {
@@ -261,36 +331,144 @@ if (!console) { var console = { log: function () {} }; }
if (initialized) {
msg.ext.query = query;
msg.ext.id = (id++).toString();
- for (o in filter.values) {
- if (o !== 'id' && o !== 'query') {
- msg.ext[o] = filter.values[o];
+ msg.ext.observation_domain_ids = meters;
+
+ aggregateQueryRequests[msg.ext.id] = callback;
+
+ $.cometd.publish('/service/queries/filter_by_meters', msg);
+ } else {
+ // streakerClient wasn't ready, so retry in a moment
+ window.setTimeout(function () {
+ requestAggregateQueryName(query, meters, callback);
+ }, 100);
+ }
+ };
+ })();
+
+ // get a set of filtered channels for a given set of meters
+ var requestFilteredQueryChannel = (function () {
+ var filterQueryRequests = {},
+ filterCache = {},
+ initialized = false;
+
+ function responder(response) {
+ var data = response.data || {},
+ request;
+
+ if (data.result === true) {
+ if (filterQueryRequests[data.request_id]) {
+ // cache this transient ID for future requests using the same filter object
+ // (request_id === the hash of the filter object)
+ filterCache[data.request_id] = {
+ query: 'fp-' + data.id,
+ subscribers: 0
+ };
+
+ // loop over all the callback requests waiting for this transient flow profile id
+ while (filterQueryRequests[data.request_id].length) {
+ request = filterQueryRequests[data.request_id].pop();
+ request.callback('fp-' + data.id + '_' + request.query);
+ filterCache[data.request_id].subscribers++;
}
+
+ delete filterQueryRequests[data.request_id];
}
+ } else {
+ throw(new Error(data.message));
+ }
+ }
+
+ var api = function (query, filter, callback) {
+ var filterHash = filterObjectToMD5(filter),
+ msg = { ext: { flow_profile: {} } },
+ o;
- filterQueryRequests[msg.ext.id] = callback;
+ // see if we already requested this; if so, just append the id and return
+ // (lol this next line rhymes)
+ if (filterCache[filterHash]) {
+ callback(filterCache[filterHash].query + '_' + query);
+ filterCache[filterHash].subscribers++;
+ return;
+ }
+
+ // we have to make sure the handshake has happened before we do anything
+ if (!initialized) {
+ if (!streakerClient.initialized) {
+ streakerClient = streakerClient();
+ } else if (streakerClient.isStarted()) {
+ $.cometd.addListener('/service/flow_profiles', responder);
+ initialized = true;
+ }
+ }
+
+ if (initialized) {
+ // see if we already have a request in the pipeline for this transient filter
+ if (filterQueryRequests[filterHash]) {
+ filterQueryRequests[filterHash].push({
+ query: query,
+ callback: callback
+ });
+
+ // this is the first request for this particular filter, so publish a request
+ } else {
+ msg.ext.request_id = filterHash;
+ msg.ext.flow_profile.name = '' + filterHash;
+ msg.ext.flow_profile.filter = {};
+ for (o in filter) {
+ msg.ext.flow_profile.filter[o] = filter[o];
+ }
+
+ // store the query and callback for use when we get a response from cometd
+ filterQueryRequests[filterHash] = [];
+ filterQueryRequests[filterHash].push({
+ query: query,
+ callback: callback
+ });
- $.cometd.publish('/service/queries/' + filter.name, msg);
+ // generate the request
+ $.cometd.publish('/service/flow_profiles/create_transient_flow_profile', msg);
+ }
} else {
// streakerClient wasn't ready, so retry in a moment
window.setTimeout(function () {
- requestFilteredQueryName(query, filter, callback);
+ requestFilteredQueryChannel(query, filter, callback);
}, 100);
}
};
+
+ api.clearCache = function () {
+ filterCache = {};
+ };
+
+ // need to track how many subscribers are using this transient FP; when it reaches zero it will be torn down, so we need
+ // to invalidate that cache entry
+ api.decrementSubscribers = function (filter) {
+ var filterHash = filterObjectToMD5(filter);
+
+ if (filterCache[filterHash]) {
+ filterCache[filterHash].subscribers--;
+
+ if (filterCache[filterHash].subscribers <= 0) {
+ delete filterCache[filterHash];
+ }
+ }
+ };
+
+ return api;
})();
/*
options = {
forceConnect: force a datasource to recieve updates, even if it has no subscribers,
subscriber: only used for annotations, pass 'opaque'
- }
-
- filter = {
- name: name of filter the channel on /service/queries (ie, 'filter_by_meters'),
- values: list of key:value pairs to pass to the filter service
+ aggregate: array of meters to aggregate on
+ filter: {
+ country: array of two letter country codes to filter on [optional]
+ transport: array of { port: #, protocol: # } values to filter on [optional]
+ }
}
*/
- var dataSource = function (query, options, filter) {
+ var dataSource = function (query, options) {
if (!streakerClient.initialized) {
streakerClient = streakerClient();
}
@@ -351,6 +529,8 @@ if (!console) { var console = { log: function () {} }; }
var schema, // holds a mapping of array offsets to field names; use this to expand compressed objects before emitting to consumers
keys; // list of field names that make up unique key
+ var possiblyNoMeterData = false;
+
function expand(source) {
var out = [];
@@ -368,17 +548,21 @@ if (!console) { var console = { log: function () {} }; }
added = null,
removed = null;
+ // these fields come in the state dump
if (data.schema) {
schema = data.schema;
+ keys = data.keys;
}
- if (data.keys) {
- keys = data.keys;
+ // if we didn't get inserts in the state dump, we aren't getting meter data for this query
+ if (data.keys && data.schema && data.insert.length === 0) {
+ possiblyNoMeterData = true;
}
if (schema) {
if (data.insert && data.insert.length) {
added = expand(data.insert);
+ possiblyNoMeterData = false;
}
if (data.remove && data.remove.length) {
@@ -388,7 +572,8 @@ if (!console) { var console = { log: function () {} }; }
return {
added: added,
- removed: removed
+ removed: removed,
+ possiblyNoMeterData: possiblyNoMeterData
};
};
})();
@@ -403,21 +588,29 @@ if (!console) { var console = { log: function () {} }; }
}
// loop through subscribers and notify them of new data
- function update(data) {
+ function update(data, force) {
var s;
- if (!restarting) {
+ // only update subscribers if we aren't restarting
+ if (!restarting || force) {
data.state = state.update(data.added, data.removed);
- // only update subscribers if we aren't restarting AND have state changes
- if (data.state) {
+ // only update subscribers if we have state changes or need to report no meter data
+ if (data.state || data.possiblyNoMeterData || force) {
window.setTimeout(function () {
for (s in subscribers) {
try {
updateSubscriber(subscribers[s], data);
} catch (e) {
- throw(e);
+ // need to handle this generically so we still update the other subscribers
+ if (e.stack) {
+ console.log(e.stack);
+ } else if (e.stackTrace) {
+ console.log(e.stackTrace);
+ } else {
+ console.log('Error: ' + e.type + ' - ' + e.message);
+ }
}
}
}, 0);
@@ -434,6 +627,8 @@ if (!console) { var console = { log: function () {} }; }
// connects to streaker, with optional completion callback
function connect(complete) {
+ var filter = {}, f;
+
function finish(query) {
state.clear();
lastData = {};
@@ -447,13 +642,27 @@ if (!console) { var console = { log: function () {} }; }
if (!streakerID || streakerID === WAITING) {
if (streakerClient.isStarted()) {
- if (filter) {
+ if (options.filter) {
streakerID = WAITING;
+
+ for (f in options.filter) {
+ filter[f] = options.filter[f];
+ }
+
+ // grab meters from the aggregate value if it wasn't defined in filters
+ if (!filter.meters && options.aggregate) {
+ filter.meters = options.aggregate;
+ }
+
requestFilteredQueryChannel(query, filter, finish);
+ } else if (options.aggregate) {
+ streakerID = WAITING;
+ requestAggregatedQueryChannel(query, options.aggregate, finish);
} else {
finish(query);
}
} else {
+ // we need to wait for the handshake
streakerID = WAITING;
window.setTimeout(function () {
connect(complete);
@@ -464,17 +673,37 @@ if (!console) { var console = { log: function () {} }; }
// disconnect from streaker, with optional completion callback
function disconnect(complete) {
+ var filter = {}, f;
+
if (streakerID) {
if (streakerID === WAITING) {
window.setTimeout(function () {
disconnect(complete);
}, 100);
} else {
- streakerClient.unsubscribe(streakerID, complete);
- streakerID = null;
+ // if this is a transient FP, we need to see if this disconnect will tear down the query set
+ if (options.filter) {
+ for (f in options.filter) {
+ filter[f] = options.filter[f];
+ }
+
+ // grab meters from the aggregate value if it wasn't defined in filters
+ if (!filter.meters && options.aggregate) {
+ filter.meters = options.aggregate;
+ }
- state.clear();
- lastData = {};
+ requestFilteredQueryChannel.decrementSubscribers(filter);
+ }
+
+ streakerClient.unsubscribe(streakerID, function () {
+ streakerID = null;
+ state.clear();
+ lastData = {};
+
+ if (complete) {
+ complete();
+ }
+ });
}
}
}
@@ -488,6 +717,14 @@ if (!console) { var console = { log: function () {} }; }
// instance interface
ds = {
addSubscriber: function (subscriber, transformation) {
+ // make sure "this" is properly bound to the subscriber's instance, in case that matters to the subscriber
+ function bind(o, f) {
+ return function () {
+ var args = Array.prototype.slice.call(arguments);
+ f.apply(o, args);
+ };
+ }
+
// connect if we haven't already
if (!streakerID) {
connect();
@@ -496,8 +733,9 @@ if (!console) { var console = { log: function () {} }; }
// update the subscriber list with the new info
// TODO: remove support for passing in objects with an update method
subscribers[++uid] = {
- update: subscriber,
- transform: (transformation && typeof transformation === 'function') ? transformation : null
+ update: subscriber.update ? bind(subscriber, subscriber.update) : subscriber,
+ transform: (transformation && typeof transformation === 'function' ?
+ bind(subscriber, transformation) : null)
};
subscriberCount++;
@@ -522,40 +760,63 @@ if (!console) { var console = { log: function () {} }; }
}
},
- // update a data transformation function for a subscription (pass null to remove transformation)
- updateTransformation: function (id, transformation) {
- if (subscribers[id]) {
- subscribers[id].transform = (transformation && typeof transformation === 'function' ? transformation : null);
+ reconnect: function (warnSubscribers) {
+ if (streakerID) {
+ restarting = true;
- // run an immediate update for this subscriber using the last update from streaker
- if ('state' in lastData) {
- udpateSubscriber(subscribers[id], lastData);
- }
+ disconnect(function () {
+ if (warnSubscribers) {
+ update({ reconnecting: true }, true);
+ }
+
+ connect(function () {
+ restarting = false;
+ });
+ });
}
},
- reconnect: function () {
+ updateQuery: function (updatedQuery, updatedOptions) {
+ updatedOptions = updatedOptions || {};
+
if (streakerID) {
restarting = true;
disconnect(function () {
+ if (updatedOptions !== undefined) {
+ options = updatedOptions;
+ }
+
+ query = updatedQuery;
connect(function () {
restarting = false;
});
});
+ } else {
+ // not currently connected to streaker, so just update the options
+ if (updatedOptions !== undefined) {
+ options = updatedOptions;
+ }
+
+ query = updatedQuery;
}
},
- updateFilter: function (updatedFilter) {
- filter = updatedFilter;
- this.reconnect();
+ // update a data transformation function for a subscription (pass null to remove transformation)
+ updateTransformation: function (id, transformation) {
+ if (subscribers[id]) {
+ subscribers[id].transform = (transformation && typeof transformation === 'function' ? transformation : null);
+
+ // run an immediate update for this subscriber using the last update from streaker
+ if ('state' in lastData) {
+ udpateSubscriber(subscribers[id], lastData);
+ }
+ }
},
lastUpdate: function () {
return lastData;
- },
-
- sc: function () { return subscriberCount; }
+ }
};
// store this interface in case streaker needs to reconnect us at some point
@@ -568,7 +829,4 @@ if (!console) { var console = { log: function () {} }; }
bndry.dataSource = dataSource;
bndry.dataSource.create = dataSource;
- // not sure why anyone would need this, but here you go
- bndry.dataSource.requestFilteredQueryChannel = requestFilteredQueryChannel;
-
})(bndry, jQuery);

0 comments on commit cd72dc7

Please sign in to comment.
Something went wrong with that request. Please try again.