diff --git a/src/main/scala/org/apache/spark/sql/sources/druid/DruidPlanner.scala b/src/main/scala/org/apache/spark/sql/sources/druid/DruidPlanner.scala index 2c9fe83..47c6ef4 100644 --- a/src/main/scala/org/apache/spark/sql/sources/druid/DruidPlanner.scala +++ b/src/main/scala/org/apache/spark/sql/sources/druid/DruidPlanner.scala @@ -182,6 +182,12 @@ object DruidPlanner { "executed." ) + val DRUID_USE_V2_GBY_ENGINE = booleanConf( + "spark.sparklinedata.druid.option.use.v2.groupByEngine", + defaultValue = Some(false), + doc = "for druid queries use the smile binary protocol" + ) + def getDruidQuerySpecs(plan : SparkPlan) : Seq[DruidQuery] = { plan.collect { case PhysicalRDD(_, r : DruidRDD, _, _, _) => r.dQuery diff --git a/src/main/scala/org/apache/spark/sql/sources/druid/DruidStrategy.scala b/src/main/scala/org/apache/spark/sql/sources/druid/DruidStrategy.scala index 6aad04a..525383c 100644 --- a/src/main/scala/org/apache/spark/sql/sources/druid/DruidStrategy.scala +++ b/src/main/scala/org/apache/spark/sql/sources/druid/DruidStrategy.scala @@ -269,6 +269,12 @@ private[druid] class DruidStrategy(val planner: DruidPlanner) extends Strategy */ qs = QuerySpecTransforms.transform(planner.sqlContext, dqb.drInfo, qs) + qs.context.foreach{ ctx => + if( planner.sqlContext.getConf(DruidPlanner.DRUID_USE_V2_GBY_ENGINE) ) { + ctx.groupByStrategy = Some("v2") + } + } + val (queryHistorical : Boolean, numSegsPerQuery : Int) = if ( !pAgg.canBeExecutedInHistorical ) { (false, -1) diff --git a/src/main/scala/org/sparklinedata/druid/DruidQuerySpec.scala b/src/main/scala/org/sparklinedata/druid/DruidQuerySpec.scala index 5fca574..e6a70e3 100644 --- a/src/main/scala/org/sparklinedata/druid/DruidQuerySpec.scala +++ b/src/main/scala/org/sparklinedata/druid/DruidQuerySpec.scala @@ -508,7 +508,8 @@ case class QuerySpecContext( minTopNThreshold : Option[Int] = None, maxResults : Option[Int] = None, maxIntermediateRows : Option[Int] = None, - groupByIsSingleThreaded : Option[Boolean] = None + groupByIsSingleThreaded : Option[Boolean] = None, + var groupByStrategy : Option[String] = None ) sealed trait QuerySpec {