-
Notifications
You must be signed in to change notification settings - Fork 0
Developer Docs
RTM provides services for the aggregation of datapoints (internally called "measurements") over an ordered axis, which is in general the timestamp attribute of the datapoints. In particular, the implementation aims to parallelize and speed up the computation by partitioning the computation job into small tasks which can then be executed in parallel, thus leveraging the hardware resources made available. As of v3.0 the computation can be distributed across multiple tiers as well as horizontally in order to achieve extreme scalability.
Once measurements are inserted into the mongodb collection, they can be queried for aggregation. This documentation page will rely on the following dataset for a visual representation of the transformations occuring in the aggregation service:
The design of the aggregation pipeline prioritizes two use cases which tend to drive two extreme types of behavior in RTM:
- the "stat summary" scenario, which provides statistic on a single, very inclusive aggregation interval containing a very large amount of measurements (aka datapoints), i.e potentially the entire collection's content.
- the "finest granularity" scenario in which a user wants to display as many aggregated intervals as possible in order to see the finest details affecting the system under measurement over time. Under the current implementation, the smallest possible interval size is 1 millisecond.
In scenario one, which is depicted below:
The computation of the very large ("max") interval datapoints will result in only one aggregate result carrying statistics for all of the datapoints:
In scenario two, which is depicted below:
The computation performed with a very small interval size will result in a large amount of aggregate results each carrying statistics for the few datapoints contained in that tiny interval:
It is worth noting that some (many?) of these aggregate results will be empty, in which case the service will not return a corresponding value for such an interval.
In order to reflect and efficiently compute both scenarios, the implementation of the aggregation pipeline potentially applies two (nested) levels of parallelization. The first level allows parallel computation of multiple intervals, while the second level arbitrarily splits (partitions) each interval into smaller once and later merges them back into a single aggregate result:
The second level of parallelization (the one which partitions) turned out to be the most important and effective one, as the average user query profile tends to lean more toward scenario 1 (the "stat summary" use case) than scenario 2 (high granularity). Therefore, the thread pool of interval parallelization (level 1) has now be forcefully set to 1, while users can choose to increase threading of level two if they want to speed up query computation.
Note: In the future, the parallelization level could be chosen and adapted dynamically by the system itself. The grid's token reservation should make implementing this a lot easier (using adaptive token usage heuristics).
These two levels of parallelization in the pipeline are represented by the following chunks of code in versions of RTM < 3.0
- Level 1 -- RequestHandler.java:
PullTaskBuilder tb = new PartitionedPullQueryBuilder(sel, prop, subPartitioning, subPoolSize, timeoutSecs);
PullPipelineBuilder ppb = new SimplePipelineBuilder(
effective.getBegin(), effective.getEnd(),
optimalSize, rh, tb);
// It's only useful to // at this level if we're looking to produce highly granular results,
// which should almost never be the case
PullPipeline pp = new PullPipeline(ppb, /*poolSize*/ 1, timeoutSecs, BlockingMode.BLOCKING);
PipelineExecutionHelper.executeAndsetListeners(pp, stream);- Level 2 -- PartitionedPullQueryTask.java
PullTaskBuilder tb = new PullQueryBuilder(super.sel, super.accumulator, prop);
PullPipelineBuilder ppb = new SimplePipelineBuilder(
bucket.getLowerBound(),
bucket.getUpperBound(),
super.subsize, super.resultHandler, tb);
PullPipeline pp = new PullPipeline(ppb, poolSize, super.timeoutSecs, BlockingMode.BLOCKING);
pp.execute();As per RTM 3.0, the first level of parallelization has currently been discarded and only level 2 is implemented within the class PartitionedPullQueryTask.java:
RangeTaskBuilder tb = new RemoteQueryTaskBuilder(super.sel, super.prop);
RunableBuilder ppb = new PullRunableBuilder(
bucket.getLowerBound(),
bucket.getUpperBound(),
super.subsize, super.resultHandler, tb);
PullPipelineExecutor pp = new PullPipelineExecutor(ppb, poolSize, super.timeoutSecs, BlockingMode.BLOCKING);
pp.execute();Somewhat similar to the concept of map-reduce, RTM computes metrics in two stages. In the first stage, which corresponds to the moment where datapoints are deserialized, scanned and exposed for computation, Accumulators are automatically invoked and are thus allowed to perform an arbitrary task on a per-datapoint basis.
In the second stage, Metrics are post processed based on the results which have been gathered by Accumulators in the first stage.
The following image provides an example for the accumulation and metric subscription in a single interval where the goal is to compute the mean (or "average") of the datapoints' values:
Accumulators are responsible for harvesting low-footprint data on the fly while the measurements are scanned. Custom WorkObject's are initialized each time the analysis of a new interval or subpartition begins, and accumulators are thus also responsible for describing the merge mechanics that satisfy their own constraints. See the Accumulator interface for more details.
Note: the code of accumulator needs to be critically efficient in terms of complexity and resource consumption, as they are the hottest code path in the entire aggregation process.
Subscribed metrics are responsible for turning the raw accumulated data into statistics which are deemed interesting to the end-user. Each metric can subscribe to the data provided by any number of accumulators, combine this data and then compute and publish a value.
Note: Currently both accumulators and metrics, although extensible and configurable, are implemented in the form of static classes. A logical next step would be to provide an interpreter (such as a Javascript ScriptEngine), which would allow end-users to perform arbitrary accumulation as well as metrics computation dynamically, by providing short-lived functions, or referring to existing functions, as part of their query.
Since the computation of exact percentile values or the visualization of the exact statistical distribution imply the ordering of the entire dataset's values (which obviously could not fit in memory and could not be done quickly), RTM provides instead an approximation of this distribution and uses a couple of parameters to adjust the precision vs memory tradeoff.
For a given interval, values are assigned to arbitrary ranges (called CountSumBucket's) which may describe the full value range in the interval. If gaps or "holes" are present in the datapoint cloud, then that gap will not be accounted for with a dedicated bucket. Buckets are allocated on a first-served basis. The merging of subinterval partitions relies on a "closest neighbor" search in order to combine CountSumBucket in a way that preserves as much precision as possible.
That being said, precision loss will occur particularly when the amplitude of the value range increases, the only way to mitigate this is by increasing the size of the array of CountSumBucket (called "nbPairs") and/or reducing the approximation size (meaning, the size of each individual bucket, called "approxMs"). Increasing nbPairs will result in higher memory consumption.
Note: the current algorithm relies on the synchronized keywords as well as Long's for counts and sums. This was designed while Shared Accumulators were implemented but should not be necessary anymore since each worker thread works in isolation on separate objects. The return of shared memory could prove to be a meaningful optimization. Regardless, getting rid of synchronization (in favor of Atomic references and/or the CompareAndSwap) as well as adding support for BigIntegers both seem like natural improvements to make in the near future.
The 3.0 architecture is designed to scale each aspect of the computation separately and based on its own needs in terms of hardware resources. The partitionner is the layer in which streams are stored in memory and where merges happen, meaning that it might be memory bound in most environments and use cases. The worker takes care essentially of deserializing the short-lived measurement objects and computing accumulation code, it is thus CPU-bound. The frontend should not do any heavy lifting, it can be used to serve a webapp but primarily takes care of tracking grid state (works as a central token reservation server). It also proxies and distributes requests among partitionner, providing a single entry point and URL for end users or thirs applications to access the aggregation cluster.
The roles of each layer and an example of a 1-4-8 setup are provided on the following diagram: