Skip to content

Commit

Permalink
Added parser for Show Datamap command
Browse files Browse the repository at this point in the history
  • Loading branch information
ravipesala committed Nov 12, 2017
1 parent aebcdc0 commit dd1ee26
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {

override def beforeAll {
sql("drop table if exists datamaptest")
sql("drop table if exists datamapshowtest")
sql("create table datamaptest (a string, b string, c string) stored by 'carbondata'")
}

Expand Down Expand Up @@ -81,8 +82,25 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
assert(dataMapSchemaList.size() == 3)
}

test("test show datamap without preaggregate") {
sql("drop table if exists datamapshowtest")
sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
sql("create datamap datamap1 on table datamapshowtest using 'new.class' dmproperties('key'='value')")
sql("create datamap datamap2 on table datamapshowtest using 'new.class' dmproperties('key'='value')")
checkExistence(sql("show datamap on table datamapshowtest"), true, "datamap1", "datamap2", "-NA-", "new.class")
}

test("test show datamap with preaggregate") {
sql("drop table if exists datamapshowtest")
sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")
sql("create datamap datamap2 on table datamapshowtest using 'new.class' dmproperties('key'='value')")
checkExistence(sql("show datamap on table datamapshowtest"), true, "datamap1", "datamap2", "-NA-", "new.class", "default.datamap1")
}


override def afterAll {
sql("drop table if exists datamaptest")
sql("drop table if exists datamapshowtest")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command.datamap

import scala.collection.JavaConverters._

import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.command.{Checker, DataProcessCommand, RunnableCommand}
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.types.StringType

/**
* Show the datamaps on the table
* @param databaseNameOp
* @param tableName
*/
case class CarbonDataMapShowCommand(
databaseNameOp: Option[String],
tableName: String)
extends RunnableCommand with DataProcessCommand {

override def output: Seq[Attribute] = {
Seq(AttributeReference("DataMapName", StringType, nullable = false)(),
AttributeReference("ClassName", StringType, nullable = false)(),
AttributeReference("Associated Table", StringType, nullable = false)())
}

override def run(sparkSession: SparkSession): Seq[Row] = {
processData(sparkSession)
}

override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
tableMeta.carbonTable
val schemaList = carbonTable.getTableInfo.getDataMapSchemaList
if (schemaList != null && schemaList.size() > 0) {
schemaList.asScala.map { s =>
var table = "-NA-"
val relationIdentifier = s.getRelationIdentifier
if (relationIdentifier != null) {
table = relationIdentifier.getDatabaseName + "." + relationIdentifier.getTableName
}
Row(s.getDataMapName, s.getClassName, table)
}
} else {
Seq.empty
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ case class CreatePreAggregateTableCommand(
// child schema object which will be updated on parent table about the
val childSchema = tableInfo.getFactTable
.buildChildSchema(dataMapName, "", tableInfo.getDatabaseName, queryString, "AGGREGATION")
dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
// updating the parent table about child table
PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)
val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.datamap.{CarbonDropDataMapCommand, CreateDataMapCommand}
import org.apache.spark.sql.execution.command.datamap.{CarbonDataMapShowCommand, CarbonDropDataMapCommand, CreateDataMapCommand}
import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCommand, CleanFilesCommand, DeleteLoadByIdCommand, DeleteLoadByLoadDateCommand, LoadTableCommand}
import org.apache.spark.sql.execution.command.partition.{AlterTableDropCarbonPartitionCommand, AlterTableSplitCarbonPartitionCommand}
import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
Expand Down Expand Up @@ -80,7 +80,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
alterAddPartition | alterSplitPartition | alterDropPartition

protected lazy val datamapManagement: Parser[LogicalPlan] =
createDataMap | dropDataMap
createDataMap | dropDataMap | showDataMap

protected lazy val alterAddPartition: Parser[LogicalPlan] =
ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> PARTITION ~>
Expand Down Expand Up @@ -142,6 +142,12 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
CarbonDropDataMapCommand(dmname, ifexists.isDefined, dbName, tableName)
}

protected lazy val showDataMap: Parser[LogicalPlan] =
SHOW ~> DATAMAP ~> ON ~> TABLE ~> (ident <~ ".").? ~ ident <~ opt(";") ^^ {
case databaseName ~ tableName =>
CarbonDataMapShowCommand(convertDbNameToLowerCase(databaseName), tableName.toLowerCase())
}

protected lazy val deleteRecords: Parser[LogicalPlan] =
(DELETE ~> FROM ~> table) ~ restInput.? <~ opt(";") ^^ {
case table ~ rest =>
Expand Down

0 comments on commit dd1ee26

Please sign in to comment.