Skip to content
This repository has been archived by the owner on Mar 30, 2021. It is now read-only.

Reference Architecture for Accelerating existing SQL Datasets

hbutani edited this page Aug 16, 2016 · 1 revision

For situations where Hadoop + Spark is the Enterprise Data Warehouse the Sparkline Accelerator solution sits on top of an existing Data Lake and comprises of an Index Management Service and a Query Serving Service.

Indexing Management
represents the physical resources, tools and software services that maintain the OLAP Index.
Query Serving
is the physical resources, tools and software services that provide the Fast B.I. experience. This represents the runtime of the Sparkline B.I. Accelerator.

Th Existing Data Lake

We assume the DataSet to be accelerated already resides in the Data Lake. The DataSet could be a single flattened Table or could be a Star or Snowflake schema of many dimension and a single Fact Table. We assume that the dataset is in one of the following forms:

Time Partitioned
the flattened table or Fact Table in a Star Schema is partitioned by time at some granularity(hour or day or month). The table can have multiple levels of Time partitioning. The table can also have other levels of partitioning, for example by advertiser-Id(for the AdTech. domain). But we assume that new data or restated data is always done for some time-period that maps to some partition level in the DataSet. Here are some examples to better explain this:
  1. Assume the Fact Table is partitioned by Hour. Now new data arrives hourly. Older data maybe restated(because of late arriving data or to restate facts) but we assume this is done on an hour basis(there is some process to capture which hours have been restated).
  2. Assume the Fact Table is partitioned by Advertiser and Day. Every day new events arrive for each Advertiser. For an advertiser data for a day maybe restated.
  3. The partition levels above maybe reversed i.e. the Fact Table is partitioned by Day and then by Advertiser. But the restating process is the same.
Not Partitioned
The most common scenario for this is that the DataSet is not changing; the requirement is to provide fast slice-and-dice on this static DataSet. Even if the DataSet is changing, the change is very slow and an entire new version of the DataSet is processed, say every few months. If the data is changing more often than the new data are new facts and not restating of existing facts and there is some way to infer new facts.

The Time-Partitioned use case is much more common than the Non Partitioned use case. Also for the Non Partitioned use case we have only encountered the case of re-processing a new version of the entire DataSet, and not that of new facts arriving periodically. The Non Partitioned use case is a sub case of the Time Partitioned use case where the granularity of processing is a single Interval(all of the relevant time for the dataset). So we only focus on the Time-Partitioned use case in this document.

So the starting point for the B.I. Accelerator solution looks something like this:

  • there is a Time-Partitioned DataSet. Partitioned at some time granularity(month, day, hour), potentially having multiple time partitioning levels.
  • the DataSet is served by services like Hive/Spark/custom ETL jobs running on a YARN(or equivalent Resource Manager) managed cluster.
  • The DataSets’ metadata is captured in a Hive MetaStore shared by all Data Processing services.
  • The Datasets are stored in HDFS(or equivalent Distributed Storage Layer).
  • There are processes in place add new time partitions to the Fact Table.
  • There maybe other Analytic/B.I. jobs running on this cluster some of which need to be accelerated.

Sparkline B.I. Accelerator

In order to provide the accelerated query experience following components, processes and enhancements are made to the Data Lake:

  • a new ETL pipeline is setup whose job it is to keep the OLAP Index current.
  • the Hive MetaStore has new Tables registered to record the state of Indexing and the Accelerated DataSet.
  • The Sparkline Query Serving ThriftServer is run as a service on the Yarn cluster that provides the fast slice-and-dice query experience on the accelerated DataSet.
  • Optionally a Sparkline Indexing ThriftServer is run as a service on the Yarn cluster that is used to run the ETL pipeline to build the Flattened dataset.
  • A Druid Cluster is added to house the Druid query, indexing and coordination daemons.

Index Management

The new ETL pipeline has the same time periodicity as the existing ETL pipeline that handles new Fact partitions arriving in the Data Lake. This is made up of 2 steps:

  • Fact data is flattened(dimension values are dereferenced) and if needed some lightweight transformations are applied(for example change timestamp column to iso format for ingestion into Druid)
  • The Flattened DataSet is run through a Druid Indexing Job to create a Druid Segment for the new Fact DataSet partition.

The Flattened Data is maintained as a External Table in Hive Metastore. Flattened data only needs to be maintained for the duration of the indexing jobs. There is a 1-1 mapping between partitions at some level of partitioning from the original DataSet to Druid segments. The mapping and the state of the corresponding Druid Segment can be captured as Partition level properties in the Hive metastore.

The Flattening Job is mostly easily expressed as 1 or more SQL steps, the last of which involves populating a new partition in the Flattened External Table. This job must be given enough resources on the shared cluster: either as a separate queue or by running another service(thriftserver for ETL) on the Yarn cluster.

For restated partitions the pipeline is almost the same as that for a new partition: running the pipeline on an already indexed partition creates a new version of the Druid Segment for this partition. Once the new version becomes available in the Query Serving Layer the older version can be removed from Druid Storage.

For late arriving data the new data is indexed and then a new version of the Segment needs to be created by merging the current version with the new data Segment.

Other Index Management jobs involve: merging segment files of older time periods to save space, reducing the Segment time granularity of older segments to save space etc.

Both the Flattening ETL job and the Druid Indexing job run as jobs on the YARN cluster and can utilize the common resources on this cluster. They are batch jobs. The resources required for the ETL job depend on the size of the partition that is indexed and on the processing needed to flatten(depends on number of Dimensions dereferenced) the original DataSet. The Indexing Job depends on the size of the Flattened DataSet. Both of these jobs are highly parallelizable and cost-time tradeoffs can be made based on how much cluster resources to allocate vs how quickly does the new data become available for acceleration. A hard limit on this tradeoff is that indexing for a partition must finish within a time window before the next partition becomes available for indexing.

Druid Cluster

The Druid Cluster represents the set of nodes that house the Druid daemons. The Druid cluster size is dominated by the number of historical daemons needed to meet the SLAs for the Query workload. These depend on the size of the Druid Index(which is typically at least an order of magnitude smaller than the raw data size), the concurrency and latency requirements for query serving.

Query Serving Layer

The B.I. Accelerator functionality is most commonly exposed through an instance of the Sparkline Thriftserver. One of the key functions this provides is the optimization of Query Plans that then utilize Spark and Druid Resources to answer queries in the most efficient and timely manner. The ThriftServer itself is a single JVM and takes up 1 slot of the Yarn cluster. But based on the Query mix the Spark cluster needs to be run with enough Executors so that parts of Query Plans like dedup of Distinct Values, Joins, SQL Analytical operators etc that are executed in Spark have enough resources to finish within required SLAs.

Other Deployment possibilities

So far we have talked about a Data Lake deployment model where the new components and services of the B.I. Accelerator utilize resources from the existing Data Lake. Another option is to have a Data Mart deployment model where the physical resources needed for the B.I. Accelerator are on a separate cluster; the common link is either a shared storage layer or for the services on the 2 clusters having access to both storage layers.

These 2 deployment scenarios are the 2 extremes: with variations that have pieces of both: for example the Indexing Management could run on the Data Lake whereas the Query Serving layer could run on a separate cluster.

Clone this wiki locally