Skip to content

Commit

Permalink
Merge e637fb4 into d1bfb74
Browse files Browse the repository at this point in the history
  • Loading branch information
akashrn5 committed Sep 20, 2018
2 parents d1bfb74 + e637fb4 commit 7ae546f
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@

package org.apache.carbondata.cluster.sdv.generated

import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.Row
import org.apache.spark.sql.common.util._
import org.apache.spark.sql.test.TestQueryExecutor
import org.apache.spark.util.SparkUtil
import org.scalatest.BeforeAndAfterAll

import org.apache.carbondata.common.constants.LoggerAction
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties

Expand Down Expand Up @@ -1000,6 +1002,20 @@ class AlterTableTestCase extends QueryTest with BeforeAndAfterAll {
sql(s"""drop table if exists uniqdata59""").collect
}

test("Alter table add column for hive table for spark version above 2.1") {
sql("drop table if exists alter_hive")
sql("create table alter_hive(name string)")
if(SPARK_VERSION.startsWith("2.1")) {
val exception = intercept[MalformedCarbonCommandException] {
sql("alter table alter_hive add columns(add string)")
}
assert(exception.getMessage.contains("Unsupported alter operation on hive table"))
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
sql("alter table alter_hive add columns(add string)")
sql("insert into alter_hive select 'abc','banglore'")
}
}

val prop = CarbonProperties.getInstance()
val p1 = prop.getProperty("carbon.horizontal.compaction.enable", CarbonCommonConstants.defaultIsHorizontalCompactionEnabled)
val p2 = prop.getProperty("carbon.horizontal.update.compaction.threshold", CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static boolean isBlockWithoutBlockletInfoExists(List<CarbonInputSplit> sp
return false;
}

private static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
public static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
DataType carbonDataType) {
if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) {
return DataTypes.StringType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ import org.apache.spark.sql.catalyst.parser.AstBuilder
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructField

import org.apache.carbondata.core.constants.CarbonCommonConstants

Expand Down Expand Up @@ -301,6 +303,19 @@ object CarbonReflectionUtils {
}
}

/**
* method to invoke alter table add columns for hive table from carbon session
* @param table
* @param colsToAdd
* @return
*/
def invokeAlterTableAddColumn(table: TableIdentifier,
colsToAdd: Seq[StructField]): Object = {
val caseClassName = "org.apache.spark.sql.execution.command.AlterTableAddColumnsCommand"
CarbonReflectionUtils.createObject(caseClassName, table, colsToAdd)
._1.asInstanceOf[RunnableCommand]
}

def createObject(className: String, conArgs: Object*): (Any, Class[_]) = {
val clazz = Utils.classForName(className)
val ctor = clazz.getConstructors.head
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTa
import org.apache.spark.sql.execution.datasources.{RefreshResource, RefreshTable}
import org.apache.spark.sql.hive.{CarbonRelation, CreateCarbonSourceTableAsSelectCommand}
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.util.{CarbonReflectionUtils, FileUtils}
import org.apache.spark.sql.types.StructField
import org.apache.spark.util.{CarbonReflectionUtils, FileUtils, SparkUtil}

import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
import org.apache.carbondata.spark.util.Util

/**
* Carbon strategies for ddl commands
Expand Down Expand Up @@ -152,6 +154,19 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
} else {
ExecutedCommandExec(addColumn) :: Nil
}
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
val structField = (alterTableAddColumnsModel.dimCols ++ alterTableAddColumnsModel.msrCols)
.map {
a =>
StructField(a.column,
Util.convertCarbonToSparkDataType(DataTypeUtil.valueOf(a.dataType.get)))
}
val identifier = TableIdentifier(
alterTableAddColumnsModel.tableName,
alterTableAddColumnsModel.databaseName)
ExecutedCommandExec(CarbonReflectionUtils
.invokeAlterTableAddColumn(identifier, structField).asInstanceOf[RunnableCommand]) ::
Nil
} else {
throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
}
Expand Down

0 comments on commit 7ae546f

Please sign in to comment.