Skip to content
This repository has been archived by the owner on Mar 30, 2021. It is now read-only.

Commit

Permalink
update CostModel: set cardinality, numRows; special cost models for e…
Browse files Browse the repository at this point in the history
…ach query type
  • Loading branch information
Harish Butani committed Sep 15, 2016
1 parent 9e77eab commit 5b9708b
Show file tree
Hide file tree
Showing 10 changed files with 518 additions and 249 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ private[druid] class DruidStrategy(val planner: DruidPlanner) extends Strategy
* and not an epoch.
*/
val replaceTimeReferencedDruidColumns = dqb1.referencedDruidColumns.mapValues {
case dtc@DruidTimeDimension(_, _, sz) => DruidDimension(
case dtc@DruidTimeDimension(_, _, sz, card) => DruidDimension(
DruidDataSource.EVENT_TIMESTAMP_KEY_NAME,
DruidDataType.String,
sz,
dtc.cardinality)
card)
case dc => dc
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ case class ExplainDruidRewrite(sql: String) extends RunnableCommand {
val druidDSIntervals = dR.drDSIntervals
val druidDSFullName= dR.drFullName
val druidDSOptions = dR.drOptions
val ndvEstimate = dR.ndvEstimate
val inputEstimate = dR.inputEstimate
val outputEstimate = dR.outputEstimate

s"""DruidQuery(${System.identityHashCode(dR.dQuery)}) details ::
|${DruidQueryCostModel.computeMethod(
sqlContext, druidDSIntervals, druidDSFullName, druidDSOptions, ndvEstimate, dR.dQuery.q)
sqlContext, druidDSIntervals, druidDSFullName, druidDSOptions,
inputEstimate, outputEstimate, dR.dQuery.q)
}
""".stripMargin.split("\n").map(Row(_))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.sparklinedata.druid
import jodd.datetime.Period
import org.joda.time.chrono.ISOChronology
import org.joda.time.{DateTime, DateTimeZone, Interval}
import org.json4s.MappingException
import org.json4s.JsonAST.{JField, JObject, JString}
import org.json4s.{CustomSerializer, MappingException}

import scala.util.Try
import scala.language.postfixOps
Expand Down Expand Up @@ -105,4 +106,23 @@ case class DurationGranularity(
}
}

class DruidQueryGranularitySerializer extends CustomSerializer[DruidQueryGranularity](format => {
implicit val f = format
(
{
case jO@JObject(JField("type", JString("duration")) :: rest) =>
jO.extract[DurationGranularity]
case jO@JObject(JField("type", JString("period")) :: rest) =>
jO.extract[PeriodGranularity]
case jO@JObject(JField("type", JString("all")) :: rest) => AllGranularity
case jO@JObject(JField("type", JString("none")) :: rest) => NoneGranularity
}
,
{
case x: DruidQueryGranularity =>
throw new RuntimeException("DruidQueryGranularity serialization not supported.")
}
)
})


3 changes: 2 additions & 1 deletion src/main/scala/org/sparklinedata/druid/DruidRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ class DruidRDD(sqlContext: SQLContext,
val drOptions = drInfo.options
val drFullName = drInfo.fullName
val drDSIntervals = drInfo.druidDS.intervals
val ndvEstimate = DruidQueryCostModel.estimateNDV(dQuery.q, drInfo)
val inputEstimate = DruidQueryCostModel.estimateInput(dQuery.q, drInfo)
val outputEstimate = DruidQueryCostModel.estimateOutput(dQuery.q, drInfo)
val (httpMaxPerRoute, httpMaxTotal) = (
DruidPlanner.getConfValue(sqlContext,
DruidPlanner.DRUID_CONN_POOL_MAX_CONNECTIONS_PER_ROUTE),
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/org/sparklinedata/druid/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ object Utils extends Logging {
) +
new EnumNameSerializer(FunctionalDependencyType) +
new EnumNameSerializer(DruidDataType) +
new DruidQueryGranularitySerializer +
new QueryResultRowSerializer +
new SelectResultRowSerializer +
new TopNResultRowSerializer ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ abstract class DruidClient(val host : String,
val jR = render(
("queryType" -> "segmentMetadata") ~ ("dataSource" -> dataSource) ~
("intervals" -> ins) ~
("analysisTypes" -> List[String]("cardinality")) ~
("analysisTypes" -> List[String]("cardinality", "interval", "minmax", "queryGranularity")) ~
("merge" -> "true")
)

Expand Down
28 changes: 26 additions & 2 deletions src/main/scala/org/sparklinedata/druid/client/DruidMessages.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,37 @@ package org.sparklinedata.druid.client
import org.joda.time.{DateTime, Interval}
import org.json4s.CustomSerializer
import org.json4s.JsonAST._
import org.sparklinedata.druid.{DruidQueryGranularity, NoneGranularity}

case class ColumnDetails(typ : String, size : Long,
cardinality : Option[Long], errorMessage : Option[String])
cardinality : Option[Long],
minValue : Option[String],
maxValue : Option[String],
errorMessage : Option[String]) {

def isDimension = cardinality.isDefined
}

case class MetadataResponse(id : String,
intervals : List[String],
columns : Map[String, ColumnDetails],
size : Long)
size : Long,
numRows : Option[Long],
queryGranularity : Option[DruidQueryGranularity]
) {

def getIntervals : List[Interval] = intervals.map(Interval.parse(_))

def timeTicks(ins : List[Interval] ) : Long =
queryGranularity.getOrElse(NoneGranularity).ndv(ins)

def getNumRows : Long = numRows.getOrElse{
val p =
columns.values.filter(c => c.isDimension).map(_.cardinality.get).map(_.toDouble).product
if (p > Long.MaxValue) Long.MaxValue else p.toLong
}

}

case class SegmentInfo(id : String,
intervals : Interval,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,40 +46,45 @@ sealed trait DruidColumn {
* assume the worst for time dim and metrics,
* this is only used during query costing
*/
def cardinality : Long = Int.MaxValue.toLong
val cardinality : Long

def isDimension(excludeTime : Boolean = false) : Boolean
}

object DruidColumn {

def apply(nm : String, c : ColumnDetails) : DruidColumn = {
def apply(nm : String,
c : ColumnDetails,
idxNumRows : Long,
idxTicks : Long) : DruidColumn = {
if (nm == DruidDataSource.TIME_COLUMN_NAME) {
DruidTimeDimension(nm, DruidDataType.withName(c.typ), c.size)
DruidTimeDimension(nm, DruidDataType.withName(c.typ), c.size, Math.min(idxTicks,idxNumRows))
} else if ( c.cardinality.isDefined) {
DruidDimension(nm, DruidDataType.withName(c.typ), c.size, c.cardinality.get)
} else {
DruidMetric(nm, DruidDataType.withName(c.typ), c.size)
DruidMetric(nm, DruidDataType.withName(c.typ), c.size, idxNumRows)
}
}
}

case class DruidDimension(name : String,
dataType : DruidDataType.Value,
size : Long,
override val cardinality : Long) extends DruidColumn {
cardinality : Long) extends DruidColumn {
def isDimension(excludeTime : Boolean = false) = true
}

case class DruidMetric(name : String,
dataType : DruidDataType.Value,
size : Long) extends DruidColumn {
size : Long,
cardinality : Long) extends DruidColumn {
def isDimension(excludeTime : Boolean = false) = false
}

case class DruidTimeDimension(name : String,
dataType : DruidDataType.Value,
size : Long) extends DruidColumn {
size : Long,
cardinality : Long) extends DruidColumn {
def isDimension(excludeTime : Boolean = false) = !excludeTime

}
Expand All @@ -93,6 +98,8 @@ case class DruidDataSource(name : String,
intervals : List[Interval],
columns : Map[String, DruidColumn],
size : Long,
numRows : Long,
timeTicks : Long,
druidVersion : String = null) extends DruidDataSourceCapability {

import DruidDataSource._
Expand Down Expand Up @@ -132,9 +139,13 @@ object DruidDataSource {

def apply(dataSource : String, mr : MetadataResponse,
is : List[Interval]) : DruidDataSource = {

val idxNumRows = mr.getNumRows
val idxTicks = mr.timeTicks(is)

val columns = mr.columns.map {
case (n, c) => (n -> DruidColumn(n,c))
case (n, c) => (n -> DruidColumn(n,c, idxNumRows, idxTicks))
}
new DruidDataSource(dataSource, is, columns, mr.size)
new DruidDataSource(dataSource, is, columns, mr.size, idxNumRows, idxTicks)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,13 @@ class DruidQueryCostModelTest extends fixture.FunSuite with
val tpch_numProcessingThreadsPerHistorical = 7
val tpch_numHistoricals = 8

val tpchInputSize : Long = 100L * 1000 * 1000

test("tpch_one_year_query") { td =>

// distinct values estimate = 100
var costScenario = CostInput(
tpchInputSize,
100,
1.0,
default_histMergeFactor,
Expand All @@ -78,22 +81,23 @@ class DruidQueryCostModelTest extends fixture.FunSuite with
tpch_numSparkExecutors,
tpch_numProcessingThreadsPerHistorical,
tpch_numHistoricals,
classOf[GroupByQuerySpec]
Right(classOf[GroupByQuerySpec])
)
DruidQueryCostModel.compute(costScenario)

// distinct values estimate = 1000
DruidQueryCostModel.compute(costScenario.copy(dimsNDVEstimate = 1000))
DruidQueryCostModel.compute(costScenario.copy(outputEstimate = 1000))

// distinct values estimate = 10000
DruidQueryCostModel.compute(costScenario.copy(dimsNDVEstimate = 10000))
DruidQueryCostModel.compute(costScenario.copy(outputEstimate = 10000))
}


test("tpch_small_result_query") { td =>


val costScenario = CostInput(
tpchInputSize,
100,
1.0,
default_histMergeFactor,
Expand All @@ -112,7 +116,7 @@ class DruidQueryCostModelTest extends fixture.FunSuite with
tpch_numSparkExecutors,
tpch_numProcessingThreadsPerHistorical,
tpch_numHistoricals,
classOf[GroupByQuerySpec]
Right(classOf[GroupByQuerySpec])
)
DruidQueryCostModel.compute(costScenario)
}
Expand All @@ -122,6 +126,7 @@ class DruidQueryCostModelTest extends fixture.FunSuite with

// distinct values estimate = 100
val costScenario = CostInput(
tpchInputSize,
10000,
1.0,
default_histMergeFactor,
Expand All @@ -140,22 +145,22 @@ class DruidQueryCostModelTest extends fixture.FunSuite with
tpch_numSparkExecutors,
tpch_numProcessingThreadsPerHistorical,
tpch_numHistoricals,
classOf[GroupByQuerySpec]
Right(classOf[GroupByQuerySpec])
)
DruidQueryCostModel.compute(costScenario)

// distinct values estimate = 1000
DruidQueryCostModel.compute(costScenario.copy(dimsNDVEstimate = 1000))
DruidQueryCostModel.compute(costScenario.copy(outputEstimate = 1000))

// distinct values estimate = 10000
DruidQueryCostModel.compute(costScenario.copy(dimsNDVEstimate = 10000))
DruidQueryCostModel.compute(costScenario.copy(outputEstimate = 10000))

// distinct values estimate = 4611686018427387904L
DruidQueryCostModel.compute(costScenario.copy(dimsNDVEstimate = 4611686018427387904L))
DruidQueryCostModel.compute(costScenario.copy(outputEstimate = 50L * 1000 * 1000))

DruidQueryCostModel.compute(
costScenario.copy(dimsNDVEstimate = 4611686018427387904L).
copy(querySpecClass = classOf[SelectSpecWithIntervals]))
costScenario.copy(outputEstimate = 50L * 1000 * 1000).
copy(querySpec = Right(classOf[SelectSpecWithIntervals])))

}

Expand Down

0 comments on commit 5b9708b

Please sign in to comment.