From dd1ee26742f8c8f8731260ac623173c5309cb153 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Sun, 12 Nov 2017 22:56:17 +0530 Subject: [PATCH] Added parser for Show Datamap command --- .../datamap/TestDataMapCommand.scala | 18 +++++ .../datamap/CarbonDataMapShowCommand.scala | 67 +++++++++++++++++++ .../CreatePreAggregateTableCommand.scala | 1 + .../sql/parser/CarbonSpark2SqlParser.scala | 10 ++- 4 files changed, 94 insertions(+), 2 deletions(-) create mode 100644 integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala index 7f266cc608f..d1c2e9d67cd 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala @@ -9,6 +9,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { override def beforeAll { sql("drop table if exists datamaptest") + sql("drop table if exists datamapshowtest") sql("create table datamaptest (a string, b string, c string) stored by 'carbondata'") } @@ -81,8 +82,25 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { assert(dataMapSchemaList.size() == 3) } + test("test show datamap without preaggregate") { + sql("drop table if exists datamapshowtest") + sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") + sql("create datamap datamap1 on table datamapshowtest using 'new.class' dmproperties('key'='value')") + sql("create datamap datamap2 on table datamapshowtest using 'new.class' dmproperties('key'='value')") + checkExistence(sql("show datamap on table datamapshowtest"), true, "datamap1", "datamap2", "-NA-", "new.class") + } + + test("test show datamap with preaggregate") { + sql("drop table if exists datamapshowtest") + sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") + sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest") + sql("create datamap datamap2 on table datamapshowtest using 'new.class' dmproperties('key'='value')") + checkExistence(sql("show datamap on table datamapshowtest"), true, "datamap1", "datamap2", "-NA-", "new.class", "default.datamap1") + } + override def afterAll { sql("drop table if exists datamaptest") + sql("drop table if exists datamapshowtest") } } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala new file mode 100644 index 00000000000..f5ee986a20f --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.datamap + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.execution.command.{Checker, DataProcessCommand, RunnableCommand} +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.types.StringType + +/** + * Show the datamaps on the table + * @param databaseNameOp + * @param tableName + */ +case class CarbonDataMapShowCommand( + databaseNameOp: Option[String], + tableName: String) + extends RunnableCommand with DataProcessCommand { + + override def output: Seq[Attribute] = { + Seq(AttributeReference("DataMapName", StringType, nullable = false)(), + AttributeReference("ClassName", StringType, nullable = false)(), + AttributeReference("Associated Table", StringType, nullable = false)()) + } + + override def run(sparkSession: SparkSession): Seq[Row] = { + processData(sparkSession) + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + Checker.validateTableExists(databaseNameOp, tableName, sparkSession) + val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore. + lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. + tableMeta.carbonTable + val schemaList = carbonTable.getTableInfo.getDataMapSchemaList + if (schemaList != null && schemaList.size() > 0) { + schemaList.asScala.map { s => + var table = "-NA-" + val relationIdentifier = s.getRelationIdentifier + if (relationIdentifier != null) { + table = relationIdentifier.getDatabaseName + "." + relationIdentifier.getTableName + } + Row(s.getDataMapName, s.getClassName, table) + } + } else { + Seq.empty + } + } +} diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala index 3b6ea58dc9e..c66b52d8f81 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -86,6 +86,7 @@ case class CreatePreAggregateTableCommand( // child schema object which will be updated on parent table about the val childSchema = tableInfo.getFactTable .buildChildSchema(dataMapName, "", tableInfo.getDatabaseName, queryString, "AGGREGATION") + dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2)) // updating the parent table about child table PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession) val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 848b6548353..252381c61ad 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.command.datamap.{CarbonDropDataMapCommand, CreateDataMapCommand} +import org.apache.spark.sql.execution.command.datamap.{CarbonDataMapShowCommand, CarbonDropDataMapCommand, CreateDataMapCommand} import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCommand, CleanFilesCommand, DeleteLoadByIdCommand, DeleteLoadByLoadDateCommand, LoadTableCommand} import org.apache.spark.sql.execution.command.partition.{AlterTableDropCarbonPartitionCommand, AlterTableSplitCarbonPartitionCommand} import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand} @@ -80,7 +80,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { alterAddPartition | alterSplitPartition | alterDropPartition protected lazy val datamapManagement: Parser[LogicalPlan] = - createDataMap | dropDataMap + createDataMap | dropDataMap | showDataMap protected lazy val alterAddPartition: Parser[LogicalPlan] = ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> PARTITION ~> @@ -142,6 +142,12 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { CarbonDropDataMapCommand(dmname, ifexists.isDefined, dbName, tableName) } + protected lazy val showDataMap: Parser[LogicalPlan] = + SHOW ~> DATAMAP ~> ON ~> TABLE ~> (ident <~ ".").? ~ ident <~ opt(";") ^^ { + case databaseName ~ tableName => + CarbonDataMapShowCommand(convertDbNameToLowerCase(databaseName), tableName.toLowerCase()) + } + protected lazy val deleteRecords: Parser[LogicalPlan] = (DELETE ~> FROM ~> table) ~ restInput.? <~ opt(";") ^^ { case table ~ rest =>