Branch: master
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
297 lines (221 sloc) 12.9 KB
v0.8 - September, 2016 Paul Asmuth <>
Table of Contents
1. Introduction
2. Definitions
3. Design Overview
4. Implementation Details
4.1 Table Eligibility
4.2 Configuration
4.3 Aggregation Functions
4.4 Pre-Aggregation Procedure
4.5 Leader Election
4.6 Load Balancing
4.7 Changes to the ALTER statement
4.8 Changes to the INSERT statement
4.9 Changes to the SELECT statement
5. Reliability Considerations
6. Alternatives Considered
6.1 Implement as standalone service
6.2 Implement in the compaction routine
7. Code Locations
1. Introduction
One of the primary usecases of EventQL is multidimensional analysis of
timeseries data. These type of analyses usually involve a table that is
indexed by a DATETIME primary key and contains a number of other columns that
can be classified into "dimensions" and "measurements".
Dimensions are columns that may be referred in the GROUP BY statement of
queries on the data. Measurements are (usually numeric) columns that will only
be used in aggregations (i.e. from aggregate function in the SELECT list).
For this usecase it's usually highly beneficial to pre-aggregate the
measurements for each input dimension over a recurring time interval before
writing them to disk.
To see why this may result in huge performance increases, consider this
somewhat realistic usecase: We're running an ad network and are displaying
100,000 ad impressions per second. We want to measure the rate at which these
ads are clicked with 1-minute granularity. To make things a bit more
interesting, we also want to be able to get an individual click rate for each
of the websites on which our ads are displayed.
If our ads are displayed on 4,000 individual websites at any given time, the
examples works out so that the size of the data after pre-aggreagtion would be
roughly 0.07% of the size of the input data. A 1500x speedup.
2. Definitions
We define an abstract pre-aggregation routine, that accepts a rowset N of
rows for a given input time interval and produces another rowset M, so that
any query that computes aggregations on the measurements (and optionally
groups by time or one of the dimensions) will return the same result for both
input rowsets N and M. The number of rows in M will always be less or equal
to the number of rows in N.
The length of the recurring aggregation time interval represents a lower bound
on the time granularity over which the data can be grouped in queries. Hence
we'll refer to the length of an individual aggregation time interval as the
"granularity" of the aggregation from now on.
We also define the "cardinality of an aggregation" for a given time interval
as the number of rows in the output rowset M. Given an arrival rate of events
R and a granularity G, the pre-aggregation function transforms R * G input
rows into C output rows where C is the cardinality of the aggregation.
3. Design Overview
The proposed approach is to implement pre-aggregation as a separate internal
subsystem, touching as few of the core subsystems as possible.
Pre-aggregation is optionally enabled for a table by setting a pre-aggregation
config for the table (further specified in section 4).
Once enabled, we elect N hosts as "aggregation nodes" for the table. Each
insert into the pre-aggregation-enabled table will initially bypass the normal
insert code and will instead be sent to one of the N aggregation hosts. Each
aggregation node then performs the pre-aggreagtion routine locally on all
received input rows and, at the end of the aggregation interval, forwards the
output rows to the regular table insert procedure.
Of course, this scheme will result in up to N * C instead of C output rows.
While this would not impact the correctness of any aggregate queries over the
data, it would still be visible to the user when doing a simple select on the
table. For UX reasons, and because we consider it an implementation detail
that should not have to be explained in user-facing documentation, we
automatically insert a default aggregation clause into every SELECT select
statement which refers to pre-aggregation-enabled tables. This default
aggregation clause is statically derived from the pre-aggregation config.
4. Implementation Details
4.1 Table Eligibility
Pre-aggregation may only be enabled for tables that meet the following
- Must have a non-unique primary key of time DATETIME
4.2 Configuration
We add the following fields ("the pre-aggregation config") to the table
configuration (which in turn is stored in the coordination service).
- granularity -- The aggregation granularity
- num_servers -- The number of servers that should run the aggr.
- dimensions -- A list of dimensions, for each dimension:
- dimensions.column -- The column name for the dimension
- measures -- A list of measures, for each measure:
- measure.column -- The column name for the measure
- measure.aggr_fn -- The aggregation function to be used
The pre-aggregation config may be changed by the user at any time. On any
change to either the pre-aggregation config or the table schema the server
must check and update the pre-aggregation config in such a fashion that it
never refers to non-existing columns.
4.3 Aggregation Functions
An aggregation function is responsible for converting a number of scalar
input values for a given measure, dimension and time interval into a single
scalar aggregated value that will be stored into one column of the output row.
Aggregation functions may optionally take a number of configuration
parameters. The parameters are encoded as <key>=<value> pairs where key and
value are both strings.
Additionally, not every aggregation function is required to support every
(output) column type.
The following is not meant as a definite list of supported aggregation
functions but mereley as the list of initially considered and supported
4.4 Pre-Aggregation Procedure
The pre-aggregation procedure is run for a specific table and on a specific
host. All the work it performs is done on the local machine and all data is
kept in memory.
Conceptually, the procedure stores a map of running aggregations. Each entry
in the map contains a vector of aggregation states for each measure.
The key for the map is another vector of strings. The key vector has one
element for each dimension in the aggregation config plus one element for the
time. The elements for the dimensions are set to the values of the respective
columns in the input row (or NULL if the input row is missing the column). The
time element is set to quotient of the input row's primary key column value
and the granularity (using truncating division). Input rows without a primary
key column value are rejected.
For each incoming row, the matching entry is created or retrieved from the
map of running aggregations. Then, the aggregation function for each measure
is called with the corresponding column's value from the input row and the
last aggregation state from the map entry. Finally, the result of the
invocation is written back to the aggregation state in the map entry.
Once a new entry is added to the aggregation map a timer is started that fires
after G (granularity) seconds have passed one the wall clock. Once the timer
fires, the entry is removed from the map and inserted into the table using
the regular table insert mechansim (using the nominal time retrieved from
the map's key as the rows primary key value).
4.5 Leader Election
We use the coordination service to elect the N aggregating servers from the
list of available servers for every table. The aggregating servers may, in
principle, be randomly chosen from all live servers in the cluster.
The implementation of the election routine is pluggable (as is the
coordination service) and not in the scope of this document.
4.6 Load Balancing
Each input row should be sent to one of the aggregating servers for the table
in such a fashion that the distribution of rows over servers will be
approximately uniformly. This could be implemented as a random pick for every
row or a simple round-robin scheme.
However, we're explicitly _not_ using a consistent assignment scheme based
on the rows dimension as that would open the system up to hotspotting issues
if the distribution of the input data is not uniform (which is likely).
4.7 Changes to the ALTER statement
The ALTER TABLE must be changed to atomically remove columns from the
pre-aggregation config when columns are removed from the table schema.
Forthermore, the new ALTER TABLE SET PREAGGREGATION statement must be
4.8 Changes to the INSERT statement
The INSERT statment must be modified to check if the table is
pre-aggregation-enabled and if so, divert the insert into the pre-aggregation
subsystem. A new flag INSERT_NOPREAGGR is added that optionally disabled this
4.9 Changes to the SELECT statement
The SELECT statement execution must be altered to insert an aggregation node
for every query on a pre-aggregation-enabled table that does not already
contain an eligible aggregation clause.
The synthetic aggregation node is a group over the quotient of primary key
column value and granularity (using truncating division) where all columns
referred to from measurement definitions are aggregated using the respective
aggregation function's output merge function.
5. Relability Considerations
The proposed implementation has no provisions to handle pending data on server
shutdown or server crashes, so in these events some input rows will be lost.
As part of the separate writea-ahead-logging effort, we will add (optional)
durable write-ahead storage for accepted input rows.
A separate issue that is not addressed is that the API and implementation do
not allow for exactly-once-inserts in the case of failures: If the client
recevies an error code after sending an insert to one of the aggregating
servers, it can't be sure if the value was stored or not so it can't correctly
decide if it should retry or not.
However, this is really more of a philosophical concern (solving the problem
would defeat a large part of the benefits of pre-aggregation). Users that need
strict exactly-once semantics can not use this optimization and should insert
the rows individually, without pre-aggregation (which _is_ an idempotent
operation and safe to retry).
6. Alternatives Considered
6.1 Implement as standalone service
On first look, pre-aggregation might seem like a feature that does not belong
in the core database, but could be provided as a separate service that
performs the pre-aggregation and then simply stores the aggregated rows into
the database.
However, with this scheme it would be painful to support more complex
aggregation modes, like estimating the number of unique strings or estimating
Both of these problems would be easily solvable using probabilistic data
structures, but if the pre-aggregation was performed in a standalone service
we would have to precompute and serialize these data structures before sending
them to the database. And we can't treat the data as binary blobs in the
database if we want to run useful (and fast) queries on it, so this
would leave us with one of two choices:
Either duplicate all data strcture code from the database into the standalone
service or a fat client and then make sure that both copies always match 100%
so the serialization works (which would be a PITA). Or alternatively, keep
the code only in the database and send all the raw values to the database as
part of the aggregated row (which would defeat the purpose of streaming
6.2 Implement in the compaction routine
One downside of the proposed approach is that with constant arrival rate R and
granularity G, the number of stored rows for a given input stream still scales
linearly in the number of hosts we assign to the stream (and therefore,
strictly speaking also in the arrival rate R).
One alternative solution, implementing the pre-aggregation in the partition
compaction routine, avoids this and ends up with a constant number of stored
rows regardless of the number of servers assigned to the aggregation.
However, in the interest of keeping the design simple and minimally invasive
to the other subsystems, modifying the compaction routine is left open for
when/if the issue turns out to be a practical limitation.
7. Code Locations