Skip to content

Commit

Permalink
Merge pull request #63 from tuplejump/feature/hive-metastore-sync-iss…
Browse files Browse the repository at this point in the history
…ue-41

Semi-automatic Hive Metastore sync (#41)
  • Loading branch information
velvia committed Mar 2, 2016
2 parents 57684e2 + 478a6fc commit ad52e08
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 4 deletions.
8 changes: 5 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ lazy val jmh = (project in file("jmh"))

val phantomVersion = "1.12.2"
val akkaVersion = "2.3.7"
val sparkVersion = "1.5.2"

lazy val extraRepos = Seq(
"Typesafe repository releases" at "http://repo.typesafe.com/typesafe/releases/",
Expand Down Expand Up @@ -103,13 +104,14 @@ lazy val cliDeps = Seq(

lazy val sparkDeps = Seq(
// We don't want LOG4J. We want Logback! The excludeZK is to help with a conflict re Coursier plugin.
"org.apache.spark" %% "spark-hive" % "1.5.2" % "provided" excludeAll(excludeSlf4jLog4j, excludeZK),
"org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided" excludeAll(excludeSlf4jLog4j, excludeZK)
"org.apache.spark" %% "spark-hive" % sparkVersion % "provided" excludeAll(excludeSlf4jLog4j, excludeZK),
"org.apache.spark" %% "spark-hive-thriftserver" % sparkVersion % "provided" excludeAll(excludeSlf4jLog4j, excludeZK),
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided"
)

lazy val jmhDeps = Seq(
"com.nativelibs4java" %% "scalaxy-loops" % "0.3.3" % "provided",
"org.apache.spark" %% "spark-sql" % "1.5.2" excludeAll(excludeSlf4jLog4j, excludeZK)
"org.apache.spark" %% "spark-sql" % sparkVersion excludeAll(excludeSlf4jLog4j, excludeZK)
)

//////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ class CassandraMetaStore(config: Config)
def getDataset(name: String): Future[Dataset] =
datasetTable.getDataset(name)

def getAllDatasets(): Future[Seq[String]] =
datasetTable.getAllDatasets

def deleteDataset(name: String): Future[Response] =
for { dtResp <- datasetTable.deleteDataset(name)
ctResp <- columnTable.deleteDataset(name) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ with FiloCassandraConnector {
Some((partCols, options)) <- select(_.partitionColumns, _.options).where(_.name eqs dataset).one() }
yield { Dataset(dataset, Seq(proj), splitCString(partCols), DatasetOptions.fromString(options)) }

def getAllDatasets: Future[Seq[String]] = select(_.name).fetch.map(_.distinct)

// NOTE: CQL does not return any error if you DELETE FROM datasets WHERE name = ...
def deleteDataset(name: String): Future[Response] =
delete.where(_.name eqs name).future().toResponse()
Expand Down
6 changes: 5 additions & 1 deletion cli/src/main/scala/filodb.cli/CliMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,11 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with CoordinatorS
}
}

def dumpAllDatasets() { println("TODO") }
def dumpAllDatasets() {
parse(metaStore.getAllDatasets) { datasets =>
datasets.foreach(println)
}
}

def createDatasetAndColumns(dataset: String,
columns: Seq[DataColumn],
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@
# Ingesting small-segment datasets may require this to be raised.
segment-batch-size = 32
}

hive {
# Uncomment the below to enable automatic syncing of FiloDB datasets into Hive Metastore external
# tables so that one does not need to register tables manually with the Hive store.
# FiloDB tables in the cassandra keyspace will be synced to the Hive database name below.
# database-name = "filodb"
}
}

akka {
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/filodb.core/store/InMemoryMetaStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class InMemoryMetaStore(implicit val ec: ExecutionContext) extends MetaStore wit
datasets.get(name).map(Future.successful)
.getOrElse(Future.failed(NotFoundError(name)))

def getAllDatasets(): Future[Seq[String]] = Future.successful(datasets.keys.toSeq)

def deleteDataset(name: String): Future[Response] = Future {
datasets.remove(name)
columns.remove(name)
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/filodb.core/store/MetaStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ trait MetaStore {
*/
def getDataset(name: String): Future[Dataset]

/**
* Retrieves the names of all datasets registered in the metastore
*/
def getAllDatasets(): Future[Seq[String]]

/**
* Deletes dataset metadata including all projections and columns. Does not delete column store data.
* @param name Name of the dataset to delete.
Expand Down
10 changes: 10 additions & 0 deletions core/src/test/scala/filodb.core/store/MetaStoreSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ with BeforeAndAfter with BeforeAndAfterAll with ScalaFutures {
it("should return NotFound if getDataset on nonexisting dataset") {
metaStore.getDataset("notThere").failed.futureValue shouldBe a [NotFoundError]
}

it("should return all datasets created") {
for { i <- 0 to 2 } {
val dataset = Dataset(i.toString, Seq("key1", ":getOrElse key2 --"), "seg",
Seq("part1", ":getOrElse part2 00"))
metaStore.newDataset(dataset).futureValue should equal (Success)
}

metaStore.getAllDatasets().futureValue.toSet should equal (Set("0", "1", "2"))
}
}

describe("column API") {
Expand Down
22 changes: 22 additions & 0 deletions spark/src/main/scala/filodb.spark/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import scala.language.postfixOps
import filodb.core._
import filodb.core.metadata.{Column, DataColumn, Dataset, RichProjection}
import filodb.coordinator.{NodeCoordinatorActor, RowSource, DatasetCoordinatorActor}
import org.apache.spark.sql.hive.filodb.MetaStoreSync

package spark {
case class DatasetNotFound(dataset: String) extends Exception(s"Dataset $dataset not found")
Expand Down Expand Up @@ -59,6 +60,25 @@ package object spark extends StrictLogging {
}
}

/**
* Syncs FiloDB datasets into Hive Metastore.
* Usually does not need to be called manually, unless you did not use the right HiveContext/Spark
* to create FiloDB tables.
*/
def syncToHive(sqlContext: SQLContext): Unit = {
val config = Option(FiloSetup.config).getOrElse {
FiloSetup.init(sqlContext.sparkContext)
FiloSetup.config
}
if (config.hasPath("hive.database-name")) {
MetaStoreSync.getHiveContext(sqlContext).foreach { hiveContext =>
MetaStoreSync.syncFiloTables(config.getString("hive.database-name"),
FiloSetup.metaStore,
hiveContext)
}
}
}

implicit class FiloContext(sqlContext: SQLContext) {

/**
Expand Down Expand Up @@ -282,6 +302,8 @@ package object spark extends StrictLogging {
writeTimeout, index)
Iterator.empty
}.count()

syncToHive(sqlContext)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.apache.spark.sql.hive.filodb

import com.typesafe.scalalogging.slf4j.StrictLogging
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext

import filodb.core.store.MetaStore
import filodb.spark.FiloRelation

object MetaStoreSync extends StrictLogging {
/**
* Tries to get a HiveContext either from a running ThriftServer or from the sqlcontext that's
* passed in.
*/
def getHiveContext(sqlContext: SQLContext): Option[HiveContext] = {
sqlContext match {
case hc: HiveContext => Some(hc)
case other: Any => None
}
}

/**
* Syncs Filo tables in the metastore to the Hive MetaStore, adding an entry in the MetaStore for
* Filo tables which are missing. By default, only works in the keyspace pointed to in the Filo
* configuration.
* @param databaseName the Hive MetaStore database name to sync with
* @param the FiloDB MetaStore
* @param hiveContext the HiveContext containing the catalog to sync to
*/
def syncFiloTables(databaseName: String, metastore: MetaStore, hiveContext: HiveContext): Int = {
val catalog = hiveContext.catalog
val hiveTables = catalog.getTables(Some(databaseName)).map(_._1)
val filoTables = FiloRelation.parse(metastore.getAllDatasets()) { ds => ds }
val missingTables = filoTables.toSet -- hiveTables.toSet
logger.info(s"Syncing FiloDB tables to Hive MetaStore. Missing tables = $missingTables")

missingTables.toSeq.foreach { missingTable =>
logger.info(s"Creating external FiloDB table $missingTable in Hive database $databaseName")
val ident = TableIdentifier(missingTable, Some(databaseName))
catalog.createDataSourceTable(ident, None, Array[String](),
provider = "filodb.spark",
options = Map("dataset" -> missingTable),
isExternal = true)
}
missingTables.size
}
}

0 comments on commit ad52e08

Please sign in to comment.