Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CARBONDATA-3628] Support alter hive table add complex column type #3529

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,31 @@ class AlterTableTestCase extends QueryTest with BeforeAndAfterAll {
assert(exception.getMessage.contains("Unsupported alter operation on hive table"))
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
sql("alter table alter_hive add columns(add string)")
sql("insert into alter_hive select 'abc','banglore'")
sql("alter table alter_hive add columns (var map<string, string>)")
IceMimosa marked this conversation as resolved.
Show resolved Hide resolved
sql("insert into alter_hive select 'abc','banglore',map('age','10','birth','2020')")
checkAnswer(
sql("select * from alter_hive"),
Seq(Row("abc", "banglore", Map("age" -> "10", "birth" -> "2020")))
)
}
}

test("Alter table add column for hive partitioned table for spark version above 2.1") {
sql("drop table if exists alter_hive")
sql("create table alter_hive(name string) stored as rcfile partitioned by (dt string)")
if (SparkUtil.isSparkVersionXandAbove("2.2")) {
sql("alter table alter_hive add columns(add string)")
sql("alter table alter_hive add columns (var map<string, string>)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a case with STRUCT Datatype also

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not work with default STRUCT Datatype (struct<>)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why it doesn't work with Struct type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataTypeUtil.valueOf just return default struct type (struct<>), here should work more for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this PR is just to support map, please modify the PR title and description.
To support struct type, need another PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this PR only support map<string, string>, array<string>, struct<> currently. I can do some work to fully support this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will merge this PR. Please raise another JIRA ticket for the struct support. Thanks

sql("alter table alter_hive add columns (loves array<string>)")
sql(
s"""
|insert into alter_hive partition(dt='par')
|select 'abc', 'banglore', map('age', '10', 'birth', '2020'), array('a', 'b', 'c')
""".stripMargin)
checkAnswer(
sql("select * from alter_hive where dt='par'"),
Seq(Row("abc", "banglore", Map("age" -> "10", "birth" -> "2020"), Seq("a", "b", "c"), "par"))
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach {
}

def deleteFile(path: String, extension: String): Unit = {
val file: CarbonFile = FileFactory
.getCarbonFile(path, FileFactory.getFileType(path))
val file: CarbonFile = FileFactory.getCarbonFile(path)

for (eachDir <- file.listFiles) {
if (!eachDir.isDirectory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,13 @@ class QueryTest extends PlanTest with Suite {

protected def checkAnswer(carbon: String, hive: String, uniqueIdentifier: String): Unit = {
val path = TestQueryExecutor.hiveresultpath + "/" + uniqueIdentifier
if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
val objinp = new ObjectInputStream(FileFactory
.getDataInputStream(path, FileFactory.getFileType(path)))
if (FileFactory.isFileExist(path)) {
val objinp = new ObjectInputStream(FileFactory.getDataInputStream(path))
val rows = objinp.readObject().asInstanceOf[Array[Row]]
objinp.close()
QueryTest.checkAnswer(sql(carbon), rows) match {
case Some(errorMessage) => {
FileFactory.deleteFile(path, FileFactory.getFileType(path))
FileFactory.deleteFile(path)
writeAndCheckAnswer(carbon, hive, path)
}
case None =>
Expand All @@ -107,8 +106,7 @@ class QueryTest extends PlanTest with Suite {

private def writeAndCheckAnswer(carbon: String, hive: String, path: String): Unit = {
val rows = sql(hive).collect()
val obj = new ObjectOutputStream(FileFactory.getDataOutputStream(path, FileFactory
.getFileType(path)))
val obj = new ObjectOutputStream(FileFactory.getDataOutputStream(path))
obj.writeObject(rows)
obj.close()
checkAnswer(sql(carbon), rows)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.carbondata.spark.rdd

import java.io.IOException
import java.util
import java.util.{Collections, List, Map}
import java.util.{Collections, List}
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable
Expand All @@ -32,10 +32,11 @@ import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo}
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.util.{CarbonException, SparkTypeConverter}
import org.apache.spark.sql.util.CarbonException

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
Expand Down Expand Up @@ -64,8 +65,8 @@ import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger._
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.MergeResult
import org.apache.carbondata.spark.load.{ByteArrayOrdering, DataLoadProcessBuilderOnSpark, PrimtiveOrdering, StringOrdering}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
import org.apache.carbondata.spark.load.{DataLoadProcessBuilderOnSpark, PrimtiveOrdering, StringOrdering}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}

class CarbonMergerRDD[K, V](
@transient private val ss: SparkSession,
Expand Down Expand Up @@ -680,7 +681,7 @@ class CarbonMergerRDD[K, V](
partitionNames = null,
splits = allSplits)
val objectOrdering: Ordering[Object] = createOrderingForColumn(rangeColumn)
val sparkDataType = Util.convertCarbonToSparkDataType(dataType)
IceMimosa marked this conversation as resolved.
Show resolved Hide resolved
val sparkDataType = CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(dataType)
// Change string type to support all types
val sampleRdd = scanRdd
.map(row => (row.get(0, sparkDataType), null))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.strategy

import org.apache.spark.sql._
import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
Expand All @@ -35,11 +36,9 @@ import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.types.StructField
import org.apache.spark.util.{CarbonReflectionUtils, DataMapUtil, FileUtils, SparkUtil}

import org.apache.carbondata.common.exceptions.DeprecatedFeatureException
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
import org.apache.carbondata.spark.util.Util

/**
* Carbon strategies for ddl commands
Expand Down Expand Up @@ -176,8 +175,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
val structField = (alterTableAddColumnsModel.dimCols ++ alterTableAddColumnsModel.msrCols)
.map {
a =>
StructField(a.column,
Util.convertCarbonToSparkDataType(DataTypeUtil.valueOf(a.dataType.get)))
StructField(a.column, CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(
DataTypeUtil.valueOf(a.dataType.get)))
}
val identifier = TableIdentifier(
alterTableAddColumnsModel.tableName,
Expand Down