Skip to content
This repository has been archived by the owner on Mar 30, 2021. It is now read-only.

Bridging Druid Results into the Spark Data pipeline

sdesikan6 edited this page Jan 31, 2016 · 5 revisions

Query Building

The DruidQueryBuilder maintains a map from the Druid Query Result columnName to the triple: (Expression, spark DataType, druid DataType):

  • Expression is the Catalyst Expression from the original Plan that the Druid column in the Result row represents.
  • The DataType of the Expression in the original SQL plan.
  • The DataType of the value returned by Druid. The 2 datatypes need not match; during rewrite a check is made to see if the conversion from the Druid datatype to Spark Expression datatype is valid. If not, the rewrite doesn't happen.

This map as populated as expressions from the Aggregate Operator are added to the DruidQueryBuilder.

Setting up the Output Schema of the PhysicalRDD Operator that wraps the Druid RDD

The rewritten Physical Plan contains a PhysicalRDD Operator at its base that wraps the DruidRDD build by the DruidRelation. More on the overall translation process can be found here ??? The schema for the PhysicalRDD Operator is formed by creating a StructType from each of the columns in the output Map maintained by the DruidQueryBuilder. For Grouping Expressions that were AttributeReferences in the original Plan, we reuse their ExprIds; for non AttributeReferences new ExprIds are generated. This way any resolved AttributeReferences above the replaced Plan SubTree are still valid and point to the correct child Attribute in the rewritten Plan.

Projection on top of the PhysicalRDD Operator.

A Projection Operator is added above the PhysicalRDD Operator to:

  • provide the same schema as the original Aggregate Operator. (or the Ordering/Filter Operator above the Agg.Op in case of having/order/limit rewrites)
  • To ensure Attribute names, ExprId and DataTypes match what was in the original Operator.

The ProjectionList is formed from the aggregation expressions of the original Agg. Operator. Any expressions that were mapped to Druid Result columns are replaced by AttributeReferences to the child PhysicalRDD Attributes. The following rules are followed:

  • If needed the AttributeReference is wrapped in a cast to convert to the original Spark Plan's dataType.
  • AttributeReferences in the original Plan carry the original ExprId, so that references above this Operator remain valid. Names from the original AttributeReference are also maintained by wrapping the new AttributeReference in an Alias.
Clone this wiki locally