Skip to content
This repository has been archived by the owner on Mar 30, 2021. It is now read-only.

Commit

Permalink
in metadata qualify tableNames, so that star schema tables, druid DS,…
Browse files Browse the repository at this point in the history
… and base tables can be in multiple DBs
  • Loading branch information
Harish Butani committed Sep 6, 2016
1 parent a6df1c4 commit 17679a1
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 59 deletions.
8 changes: 4 additions & 4 deletions src/main/scala/org/apache/spark/sql/CachedTablePattern.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ package org.apache.spark.sql
import java.util.concurrent.locks.ReentrantReadWriteLock

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{Subquery, Filter, Project, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, Subquery}
import org.apache.spark.sql.execution.columnar.InMemoryRelation

import org.apache.spark.sql.catalyst.planning.PhysicalOperation.{ReturnType}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation.ReturnType
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.hive.sparklinedata.SparklineDataContext
import org.apache.spark.sql.sources.druid.DruidPlanner

import scala.collection.mutable.{Map => mMap}
Expand Down Expand Up @@ -77,7 +77,7 @@ class CachedTablePattern(val sqlContext : SQLContext) extends PredicateHelper {
l match {
case l if l.isEmpty => Array()
case l if l.size == 1 && l(0).trim == "" => Array()
case _ => l.toArray
case _ => l.map(SparklineDataContext.qualifyWithDefault(sqlContext, _)).toArray
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.analysis.OverrideCatalog
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.{ParserDialect, TableIdentifier}
import org.apache.spark.sql.catalyst.{ParserDialect, SqlParser, TableIdentifier}
import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.sql.hive.client.{ClientInterface, ClientWrapper}
Expand Down Expand Up @@ -93,6 +93,30 @@ class SparklineDataContext(
new SparklineMetastoreCatalog(metadataHive, this) with OverrideCatalog

override protected[sql] lazy val optimizer: Optimizer = new DruidLogicalOptimizer(conf)

def currentDB = catalog.currentDB
}

object SparklineDataContext {

def qualifiedName(sqlContext : SQLContext,
tableName : String) : String = {

var tId = SqlParser.parseTableIdentifier(tableName)

if (!tId.database.isDefined) {
tId = tId.copy(database = Some(sqlContext.asInstanceOf[SparklineDataContext].currentDB))
}
s"${tId.database.get}.${tId.table}"
}

def qualifyWithDefault(sqlContext : SQLContext,
tableName : String) : String = {

var tId = SqlParser.parseTableIdentifier(tableName)
s"${tId.database.getOrElse("default")}.${tId.table}"
}

}

class SparklineMetastoreCatalog(client: ClientInterface, hive: HiveContext) extends
Expand Down Expand Up @@ -122,4 +146,6 @@ class SparklineMetastoreCatalog(client: ClientInterface, hive: HiveContext) exte
// case LogicalRelation(DruidRelation(info, _), _) => info
// }.toSeq
}

def currentDB : String = client.currentDatabase
}
15 changes: 9 additions & 6 deletions src/main/scala/org/sparklinedata/druid/DefaultSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.sparklinedata.druid

import org.apache.spark.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.sparklinedata.SparklineDataContext
import org.apache.spark.sql.sources.{BaseRelation, RelationProvider}
import org.json4s._
import org.json4s.jackson.JsonMethods._
Expand All @@ -33,11 +34,13 @@ class DefaultSource extends RelationProvider with Logging {

import Utils.jsonFormat

val sourceDFName = parameters.getOrElse(SOURCE_DF_PARAM,
var sourceDFName = parameters.getOrElse(SOURCE_DF_PARAM,
throw new DruidDataSourceException(
s"'$SOURCE_DF_PARAM' must be specified for Druid DataSource")
)

sourceDFName = SparklineDataContext.qualifiedName(sqlContext, sourceDFName)

val sourceDF = sqlContext.table(sourceDFName)

val dsName: String = parameters.getOrElse(DRUID_DS_PARAM,
Expand Down Expand Up @@ -71,11 +74,11 @@ class DefaultSource extends RelationProvider with Logging {

val druidHost = parameters.get(DRUID_HOST_PARAM).getOrElse(DEFAULT_DRUID_HOST)

val starSchemaInfo =
parameters.get(STAR_SCHEMA_INFO_PARAM).map(parse(_).extract[StarSchemaInfo]).orElse(
throw new DruidDataSourceException(
s"'$STAR_SCHEMA_INFO_PARAM' must be specified for Druid DataSource")
).get
var starSchemaInfo =
parameters.get(STAR_SCHEMA_INFO_PARAM).map(parse(_).extract[StarSchemaInfo]).
getOrElse(StarSchemaInfo(sourceDFName))

starSchemaInfo = StarSchemaInfo.qualifyTableNames(sqlContext, starSchemaInfo)

val ss = StarSchema(sourceDFName, starSchemaInfo)(sqlContext)
if (ss.isLeft) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.sparklinedata.druid.metadata

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.hive.sparklinedata.SparklineDataContext

import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer
Expand All @@ -32,6 +33,17 @@ import scala.collection.mutable.ArrayBuffer
*/
case class StarSchemaInfo(factTable : String, relations : StarRelationInfo*)

object StarSchemaInfo {

def qualifyTableNames(sqlContext : SQLContext,
sSI : StarSchemaInfo) : StarSchemaInfo = {
StarSchemaInfo(
SparklineDataContext.qualifiedName(sqlContext, sSI.factTable),
sSI.relations.map(StarRelationInfo.qualifyTableNames(sqlContext, _)):_*
)
}
}

/**
* Represents how 2 tables in a StarSchema are related.
* @param leftTable
Expand Down Expand Up @@ -62,6 +74,15 @@ object StarRelationInfo {
new StarRelationInfo(leftTable, rightTable, FunctionalDependencyType.ManyToOne,
joinCondition.map(t => EqualityCondition(t._1, t._2)))


def qualifyTableNames(sqlContext : SQLContext,
sRI : StarRelationInfo) : StarRelationInfo = {
sRI.copy(
leftTable = SparklineDataContext.qualifiedName(sqlContext, sRI.leftTable),
rightTable = SparklineDataContext.qualifiedName(sqlContext, sRI.rightTable)
)
}

}

case class EqualityCondition(leftAttribute : String, rightAttribute : String)
Expand Down
101 changes: 55 additions & 46 deletions src/test/scala/org/sparklinedata/druid/client/BaseTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,22 @@ abstract class BaseTest extends fixture.FunSuite with DruidQueryChecks with
| }
""".stripMargin.replace('\n', ' ')

val starSchema =
"""
def starSchema(factDB : String = "default",
dimDB : String = "default") =
s"""
|{
| "factTable" : "lineitem",
| "factTable" : "$factDB.lineitem",
| "relations" : [ {
| "leftTable" : "lineitem",
| "rightTable" : "orders",
| "leftTable" : "$factDB.lineitem",
| "rightTable" : "$dimDB.orders",
| "relationType" : "n-1",
| "joinCondition" : [ {
| "leftAttribute" : "l_orderkey",
| "rightAttribute" : "o_orderkey"
| } ]
| }, {
| "leftTable" : "lineitem",
| "rightTable" : "partsupp",
| "leftTable" : "$factDB.lineitem",
| "rightTable" : "$dimDB.partsupp",
| "relationType" : "n-1",
| "joinCondition" : [ {
| "leftAttribute" : "l_partkey",
Expand All @@ -106,56 +107,56 @@ abstract class BaseTest extends fixture.FunSuite with DruidQueryChecks with
| "rightAttribute" : "ps_suppkey"
| } ]
| }, {
| "leftTable" : "partsupp",
| "rightTable" : "part",
| "leftTable" : "$dimDB.partsupp",
| "rightTable" : "$dimDB.part",
| "relationType" : "n-1",
| "joinCondition" : [ {
| "leftAttribute" : "ps_partkey",
| "rightAttribute" : "p_partkey"
| } ]
| }, {
| "leftTable" : "partsupp",
| "rightTable" : "supplier",
| "leftTable" : "$dimDB.partsupp",
| "rightTable" : "$dimDB.supplier",
| "relationType" : "n-1",
| "joinCondition" : [ {
| "leftAttribute" : "ps_suppkey",
| "rightAttribute" : "s_suppkey"
| } ]
| }, {
| "leftTable" : "orders",
| "rightTable" : "customer",
| "leftTable" : "$dimDB.orders",
| "rightTable" : "$dimDB.customer",
| "relationType" : "n-1",
| "joinCondition" : [ {
| "leftAttribute" : "o_custkey",
| "rightAttribute" : "c_custkey"
| } ]
| }, {
| "leftTable" : "customer",
| "rightTable" : "custnation",
| "leftTable" : "$dimDB.customer",
| "rightTable" : "$dimDB.custnation",
| "relationType" : "n-1",
| "joinCondition" : [ {
| "leftAttribute" : "c_nationkey",
| "rightAttribute" : "cn_nationkey"
| } ]
| }, {
| "leftTable" : "custnation",
| "rightTable" : "custregion",
| "leftTable" : "$dimDB.custnation",
| "rightTable" : "$dimDB.custregion",
| "relationType" : "n-1",
| "joinCondition" : [ {
| "leftAttribute" : "cn_regionkey",
| "rightAttribute" : "cr_regionkey"
| } ]
| }, {
| "leftTable" : "supplier",
| "rightTable" : "suppnation",
| "leftTable" : "$dimDB.supplier",
| "rightTable" : "$dimDB.suppnation",
| "relationType" : "n-1",
| "joinCondition" : [ {
| "leftAttribute" : "s_nationkey",
| "rightAttribute" : "sn_nationkey"
| } ]
| }, {
| "leftTable" : "suppnation",
| "rightTable" : "suppregion",
| "leftTable" : "$dimDB.suppnation",
| "rightTable" : "$dimDB.suppregion",
| "relationType" : "n-1",
| "joinCondition" : [ {
| "leftAttribute" : "sn_regionkey",
Expand All @@ -165,19 +166,8 @@ abstract class BaseTest extends fixture.FunSuite with DruidQueryChecks with
|}
""".stripMargin.replace('\n', ' ')

override def beforeAll() = {

System.setProperty("user.timezone", "UTC")
TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
DateTimeZone.setDefault(DateTimeZone.forID("UTC"))
TestHive.setConf(DruidPlanner.TZ_ID.key, "UTC")

TestHive.sparkContext.setLogLevel("INFO")

register(TestHive)
// DruidPlanner(TestHive)

val cT = s"""CREATE TABLE if not exists orderLineItemPartSupplierBase(o_orderkey integer,
val olFlatCreateTable =
s"""CREATE TABLE if not exists orderLineItemPartSupplierBase(o_orderkey integer,
o_custkey integer,
o_orderstatus string, o_totalprice double, o_orderdate string, o_orderpriority string,
o_clerk string,
Expand All @@ -200,19 +190,12 @@ abstract class BaseTest extends fixture.FunSuite with DruidQueryChecks with
OPTIONS (path "src/test/resources/tpch/datascale1/orderLineItemPartSupplierCustomer.small",
header "false", delimiter "|")""".stripMargin

println(cT)
sql(cT)

TestHive.table("orderLineItemPartSupplierBase").cache()

TestHive.setConf(DruidPlanner.SPARKLINEDATA_CACHE_TABLES_TOCHECK.key,
"orderLineItemPartSupplierBase")

// sql("select * from orderLineItemPartSupplierBase limit 10").show(10)

val cTOlap = s"""CREATE TABLE if not exists orderLineItemPartSupplier
def olDruidDS(db : String = "default",
table : String = "orderLineItemPartSupplierBase",
dsName : String = "orderLineItemPartSupplier") =
s"""CREATE TABLE if not exists $dsName
USING org.sparklinedata.druid
OPTIONS (sourceDataframe "orderLineItemPartSupplierBase",
OPTIONS (sourceDataframe "$db.$table",
timeDimensionColumn "l_shipdate",
druidDatasource "tpch",
druidHost "localhost",
Expand All @@ -223,6 +206,32 @@ abstract class BaseTest extends fixture.FunSuite with DruidQueryChecks with
functionalDependencies '$functionalDependencies',
starSchema '$flatStarSchema')""".stripMargin

override def beforeAll() = {

System.setProperty("user.timezone", "UTC")
TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
DateTimeZone.setDefault(DateTimeZone.forID("UTC"))
TestHive.setConf(DruidPlanner.TZ_ID.key, "UTC")

TestHive.sparkContext.setLogLevel("INFO")

register(TestHive)
// DruidPlanner(TestHive)

val cT = olFlatCreateTable

println(cT)
sql(cT)

TestHive.table("orderLineItemPartSupplierBase").cache()

TestHive.setConf(DruidPlanner.SPARKLINEDATA_CACHE_TABLES_TOCHECK.key,
"orderLineItemPartSupplierBase")

// sql("select * from orderLineItemPartSupplierBase limit 10").show(10)

val cTOlap = olDruidDS()

println(cTOlap)
sql(cTOlap)
}
Expand Down
Loading

0 comments on commit 17679a1

Please sign in to comment.