From 2f8e128a57bfcaa8d5cae29c5751deafe39f6990 Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Thu, 18 Feb 2016 11:53:38 -0800 Subject: [PATCH 1/2] Add MetaStore.getAllDatasets(); CLI --command list will now list all datasets --- .../metastore/CassandraMetaStore.scala | 3 +++ .../filodb.cassandra/metastore/DatasetTable.scala | 2 ++ cli/src/main/scala/filodb.cli/CliMain.scala | 6 +++++- .../scala/filodb.core/store/InMemoryMetaStore.scala | 2 ++ core/src/main/scala/filodb.core/store/MetaStore.scala | 5 +++++ .../test/scala/filodb.core/store/MetaStoreSpec.scala | 10 ++++++++++ 6 files changed, 27 insertions(+), 1 deletion(-) diff --git a/cassandra/src/main/scala/filodb.cassandra/metastore/CassandraMetaStore.scala b/cassandra/src/main/scala/filodb.cassandra/metastore/CassandraMetaStore.scala index 274f974d59..faa212c81a 100644 --- a/cassandra/src/main/scala/filodb.cassandra/metastore/CassandraMetaStore.scala +++ b/cassandra/src/main/scala/filodb.cassandra/metastore/CassandraMetaStore.scala @@ -33,6 +33,9 @@ class CassandraMetaStore(config: Config) def getDataset(name: String): Future[Dataset] = datasetTable.getDataset(name) + def getAllDatasets(): Future[Seq[String]] = + datasetTable.getAllDatasets + def deleteDataset(name: String): Future[Response] = datasetTable.deleteDataset(name) diff --git a/cassandra/src/main/scala/filodb.cassandra/metastore/DatasetTable.scala b/cassandra/src/main/scala/filodb.cassandra/metastore/DatasetTable.scala index 657d53f796..028ff9e900 100644 --- a/cassandra/src/main/scala/filodb.cassandra/metastore/DatasetTable.scala +++ b/cassandra/src/main/scala/filodb.cassandra/metastore/DatasetTable.scala @@ -81,6 +81,8 @@ with FiloCassandraConnector { Some((partCols, options)) <- select(_.partitionColumns, _.options).where(_.name eqs dataset).one() } yield { Dataset(dataset, Seq(proj), splitCString(partCols), DatasetOptions.fromString(options)) } + def getAllDatasets: Future[Seq[String]] = select(_.name).fetch.map(_.distinct) + // NOTE: CQL does not return any error if you DELETE FROM datasets WHERE name = ... def deleteDataset(name: String): Future[Response] = delete.where(_.name eqs name).future().toResponse() diff --git a/cli/src/main/scala/filodb.cli/CliMain.scala b/cli/src/main/scala/filodb.cli/CliMain.scala index a19e1aef0e..5af3727945 100644 --- a/cli/src/main/scala/filodb.cli/CliMain.scala +++ b/cli/src/main/scala/filodb.cli/CliMain.scala @@ -134,7 +134,11 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with CoordinatorS } } - def dumpAllDatasets() { println("TODO") } + def dumpAllDatasets() { + parse(metaStore.getAllDatasets) { datasets => + datasets.foreach(println) + } + } def createDatasetAndColumns(dataset: String, columns: Seq[DataColumn], diff --git a/core/src/main/scala/filodb.core/store/InMemoryMetaStore.scala b/core/src/main/scala/filodb.core/store/InMemoryMetaStore.scala index 6f0a6a8b67..44fbd76b89 100644 --- a/core/src/main/scala/filodb.core/store/InMemoryMetaStore.scala +++ b/core/src/main/scala/filodb.core/store/InMemoryMetaStore.scala @@ -53,6 +53,8 @@ class InMemoryMetaStore(implicit val ec: ExecutionContext) extends MetaStore wit datasets.get(name).map(Future.successful) .getOrElse(Future.failed(NotFoundError(name))) + def getAllDatasets(): Future[Seq[String]] = Future.successful(datasets.keys.toSeq) + def deleteDataset(name: String): Future[Response] = Future { datasets.remove(name) columns.remove(name) diff --git a/core/src/main/scala/filodb.core/store/MetaStore.scala b/core/src/main/scala/filodb.core/store/MetaStore.scala index bfccf24f31..04acad6003 100644 --- a/core/src/main/scala/filodb.core/store/MetaStore.scala +++ b/core/src/main/scala/filodb.core/store/MetaStore.scala @@ -48,6 +48,11 @@ trait MetaStore { */ def getDataset(name: String): Future[Dataset] + /** + * Retrieves the names of all datasets registered in the metastore + */ + def getAllDatasets(): Future[Seq[String]] + /** * Deletes dataset metadata including all projections. Does not delete column store data. * @param name Name of the dataset to delete. diff --git a/core/src/test/scala/filodb.core/store/MetaStoreSpec.scala b/core/src/test/scala/filodb.core/store/MetaStoreSpec.scala index 1d961765c7..0e8758a550 100644 --- a/core/src/test/scala/filodb.core/store/MetaStoreSpec.scala +++ b/core/src/test/scala/filodb.core/store/MetaStoreSpec.scala @@ -41,6 +41,16 @@ with BeforeAndAfter with BeforeAndAfterAll with ScalaFutures { it("should return NotFound if getDataset on nonexisting dataset") { metaStore.getDataset("notThere").failed.futureValue shouldBe a [NotFoundError] } + + it("should return all datasets created") { + for { i <- 0 to 2 } { + val dataset = Dataset(i.toString, Seq("key1", ":getOrElse key2 --"), "seg", + Seq("part1", ":getOrElse part2 00")) + metaStore.newDataset(dataset).futureValue should equal (Success) + } + + metaStore.getAllDatasets().futureValue.toSet should equal (Set("0", "1", "2")) + } } describe("column API") { From dab5bd15cb01caa53700a0128a4699ea72c5e62b Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Thu, 18 Feb 2016 16:34:01 -0800 Subject: [PATCH 2/2] Add functionality to sync FiloDB tables to Hive Metastore on dataset write/append --- build.sbt | 6 ++- core/src/main/resources/application.conf | 7 +++ .../src/main/scala/filodb.spark/package.scala | 22 +++++++++ .../MetaStoreSync.scala | 48 +++++++++++++++++++ 4 files changed, 81 insertions(+), 2 deletions(-) create mode 100644 spark/src/main/scala/org.apache.spark.sql.hive.filodb/MetaStoreSync.scala diff --git a/build.sbt b/build.sbt index ab8ecc16f9..9b2d632eee 100644 --- a/build.sbt +++ b/build.sbt @@ -49,6 +49,7 @@ lazy val jmh = (project in file("jmh")) val phantomVersion = "1.12.2" val akkaVersion = "2.3.7" +val sparkVersion = "1.5.2" lazy val extraRepos = Seq( "Typesafe repository releases" at "http://repo.typesafe.com/typesafe/releases/", @@ -100,8 +101,9 @@ lazy val cliDeps = Seq( ) lazy val sparkDeps = Seq( - "org.apache.spark" %% "spark-sql" % "1.5.2" % "provided", - "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided" + "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", + "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided", + "org.apache.spark" %% "spark-hive-thriftserver" % sparkVersion % "provided" exclude("org.slf4j", "slf4j-log4j12") ) lazy val jmhDeps = Seq( diff --git a/core/src/main/resources/application.conf b/core/src/main/resources/application.conf index 5a634376e4..79300664bc 100644 --- a/core/src/main/resources/application.conf +++ b/core/src/main/resources/application.conf @@ -70,6 +70,13 @@ # Ingesting small-segment datasets may require this to be raised. segment-batch-size = 32 } + + hive { + # Uncomment the below to enable automatic syncing of FiloDB datasets into Hive Metastore external + # tables so that one does not need to register tables manually with the Hive store. + # FiloDB tables in the cassandra keyspace will be synced to the Hive database name below. + # database-name = "filodb" + } } akka { diff --git a/spark/src/main/scala/filodb.spark/package.scala b/spark/src/main/scala/filodb.spark/package.scala index 3b3a5af675..7a030b803c 100644 --- a/spark/src/main/scala/filodb.spark/package.scala +++ b/spark/src/main/scala/filodb.spark/package.scala @@ -12,6 +12,7 @@ import scala.language.postfixOps import filodb.core._ import filodb.core.metadata.{Column, DataColumn, Dataset, RichProjection} import filodb.coordinator.{NodeCoordinatorActor, RowSource, DatasetCoordinatorActor} +import org.apache.spark.sql.hive.filodb.MetaStoreSync package spark { case class DatasetNotFound(dataset: String) extends Exception(s"Dataset $dataset not found") @@ -59,6 +60,25 @@ package object spark extends StrictLogging { } } + /** + * Syncs FiloDB datasets into Hive Metastore. + * Usually does not need to be called manually, unless you did not use the right HiveContext/Spark + * to create FiloDB tables. + */ + def syncToHive(sqlContext: SQLContext): Unit = { + val config = Option(FiloSetup.config).getOrElse { + FiloSetup.init(sqlContext.sparkContext) + FiloSetup.config + } + if (config.hasPath("hive.database-name")) { + MetaStoreSync.getHiveContext(sqlContext).foreach { hiveContext => + MetaStoreSync.syncFiloTables(config.getString("hive.database-name"), + FiloSetup.metaStore, + hiveContext) + } + } + } + implicit class FiloContext(sqlContext: SQLContext) { /** @@ -212,6 +232,8 @@ package object spark extends StrictLogging { logger.info(s"Saving ($dataset/$version) with row keys $rowKeys, segment key $segmentKey, " + s"partition keys $partKeys, $numPartitions partitions") ingestDF(df, filoConfig, dataset, dfColumns.map(_.name), version, writeTimeout) + + syncToHive(sqlContext) } def ingestDF(df: DataFrame, filoConfig: Config, dataset: String, diff --git a/spark/src/main/scala/org.apache.spark.sql.hive.filodb/MetaStoreSync.scala b/spark/src/main/scala/org.apache.spark.sql.hive.filodb/MetaStoreSync.scala new file mode 100644 index 0000000000..3b6526b43a --- /dev/null +++ b/spark/src/main/scala/org.apache.spark.sql.hive.filodb/MetaStoreSync.scala @@ -0,0 +1,48 @@ +package org.apache.spark.sql.hive.filodb + +import com.typesafe.scalalogging.slf4j.StrictLogging +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.SQLContext + +import filodb.core.store.MetaStore +import filodb.spark.FiloRelation + +object MetaStoreSync extends StrictLogging { + /** + * Tries to get a HiveContext either from a running ThriftServer or from the sqlcontext that's + * passed in. + */ + def getHiveContext(sqlContext: SQLContext): Option[HiveContext] = { + sqlContext match { + case hc: HiveContext => Some(hc) + case other: Any => None + } + } + + /** + * Syncs Filo tables in the metastore to the Hive MetaStore, adding an entry in the MetaStore for + * Filo tables which are missing. By default, only works in the keyspace pointed to in the Filo + * configuration. + * @param databaseName the Hive MetaStore database name to sync with + * @param the FiloDB MetaStore + * @param hiveContext the HiveContext containing the catalog to sync to + */ + def syncFiloTables(databaseName: String, metastore: MetaStore, hiveContext: HiveContext): Int = { + val catalog = hiveContext.catalog + val hiveTables = catalog.getTables(Some(databaseName)).map(_._1) + val filoTables = FiloRelation.parse(metastore.getAllDatasets()) { ds => ds } + val missingTables = filoTables.toSet -- hiveTables.toSet + logger.info(s"Syncing FiloDB tables to Hive MetaStore. Missing tables = $missingTables") + + missingTables.toSeq.foreach { missingTable => + logger.info(s"Creating external FiloDB table $missingTable in Hive database $databaseName") + val ident = TableIdentifier(missingTable, Some(databaseName)) + catalog.createDataSourceTable(ident, None, Array[String](), + provider = "filodb.spark", + options = Map("dataset" -> missingTable), + isExternal = true) + } + missingTables.size + } +} \ No newline at end of file