Skip to content
Permalink
Browse files
[CARBONDATA-4179] Support renaming of complex columns (array/struct)
Why is this PR needed?
This PR enables renaming of complex columns - parent as well as children columns with nested levels
example: if the schema contains columns - str1 struct<a:int, b:string>, arr1 array<long>
1. alter table <table_name> change str1 str2 struct<a:int, b:string>
2. alter table <table_name> change arr1 arr2 array<long>
3. Changing parent name as well as child name
4. alter table <table_name> change str1 str2 struct<abc:int, b:string>
NOTE- Rename operation fails if the structure of the complex column has been altered.
This check ensures the old and new columns are compatible with each other. Meaning
the number of children and complex levels should be unaltered while attempting to rename.

What changes were proposed in this PR?
1. Parses the incoming new complex type. Create a nested DatatypeInfo structure.
2. This DatatypeInfo is then passed on to the AlterTableDataTypeChangeModel.
3. Validation for compatibility, duplicate columns happens here.
4. Add the parent column to the schema evolution entry.
5. Update the spark catalog table.
Limitation - Renaming is not supported for Map types yet

Does this PR introduce any user interface change?
Yes

Is any new testcase added?
Yes

This closes #4129
  • Loading branch information
akkio-97 authored and Indhumathi27 committed Jun 10, 2021
1 parent d838e3b commit cfa02dd3db2906750aeef2ebc657a1c4f58b2d66
Showing 12 changed files with 693 additions and 147 deletions.
@@ -1933,6 +1933,9 @@ private CarbonCommonConstants() {
public static final String MAP = "map";
public static final String DECIMAL = "decimal";
public static final String FROM = "from";
public static final String BIGINT = "bigint";
public static final String LONG = "long";
public static final String INT = "int";

/**
* TABLE UPDATE STATUS FILENAME
@@ -21,7 +21,9 @@
import java.math.RoundingMode;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
@@ -50,6 +52,10 @@
* Utility class for restructuring
*/
public class RestructureUtil {
// if table column is of complex type- this look up stores the column id of the parent
// (as well as children) [tableColumn_id -> tableColumn_name]. This helps to determine the
// existence of incoming query column by matching based on id.
private static Map<String, String> existingTableColumnIDMap;

/**
* Below method will be used to get the updated query dimension update
@@ -159,11 +165,21 @@ public static List<ProjectionDimension> createDimensionInfoAndGetCurrentBlockQue
return presentDimension;
}

public static void fillExistingTableColumnIDMap(CarbonDimension tableColumn) {
existingTableColumnIDMap.put(tableColumn.getColumnId(), tableColumn.getColName());
List<CarbonDimension> children = tableColumn.getListOfChildDimensions();
if (children == null) return;
for (CarbonDimension dimension : children) {
fillExistingTableColumnIDMap(dimension);
}
}

/**
* Match the columns for transactional and non transactional tables
* @param isTransactionalTable
* @param queryColumn
* @param tableColumn
* @param queryColumn - column entity that is present in the fired query or in the query model.
* @param tableColumn - column entity that is present in the table block or in the segment
* properties.
* @return
*/
public static boolean isColumnMatches(boolean isTransactionalTable,
@@ -177,6 +193,12 @@ public static boolean isColumnMatches(boolean isTransactionalTable,
.isColumnMatchBasedOnId(queryColumn)) {
return true;
} else {
if (tableColumn instanceof CarbonDimension) {
// insert list of table column id into a lookUp set, which will later be used to match
// against query column id
existingTableColumnIDMap = new HashMap<>();
fillExistingTableColumnIDMap((CarbonDimension) tableColumn);
}
return isColumnMatchesStruct(tableColumn, queryColumn);
}
} else {
@@ -191,26 +213,39 @@ public static boolean isColumnMatches(boolean isTransactionalTable,
}

/**
* In case of Multilevel Complex column - Struct/StructOfStruct, traverse all the child dimension
* to check column Id
* In case of Multilevel Complex column - Struct/StructOfStruct, traverse all the child dimensions
* of tableColumn to check if any of its column Id has matched with that of queryColumn .
*
* @param tableColumn
* @param queryColumn
* @param tableColumn - column entity that is present in the table block or in the segment
* properties.
* @param queryColumn - column entity that is present in the fired query or in the query model.
* tableColumn name and queryColumn name may or may not be the same in case schema has evolved.
* Hence matching happens based on the column ID
* @return
*/
private static boolean isColumnMatchesStruct(CarbonColumn tableColumn, CarbonColumn queryColumn) {
if (tableColumn instanceof CarbonDimension) {
List<CarbonDimension> parentDimension =
List<CarbonDimension> childrenDimensions =
((CarbonDimension) tableColumn).getListOfChildDimensions();
CarbonDimension carbonDimension = null;
CarbonDimension carbonDimension;
String[] colSplits = queryColumn.getColName().split("\\.");
StringBuffer tempColName = new StringBuffer(colSplits[0]);
for (String colSplit : colSplits) {
if (!tempColName.toString().equalsIgnoreCase(colSplit)) {
tempColName = tempColName.append(".").append(colSplit);
tempColName = tempColName.append(CarbonCommonConstants.POINT).append(colSplit);
}
carbonDimension = CarbonTable.getCarbonDimension(tempColName.toString(), parentDimension);
if (carbonDimension != null) {
carbonDimension =
CarbonTable.getCarbonDimension(tempColName.toString(), childrenDimensions);
if (carbonDimension == null) {
// Avoid returning true in case of SDK as the column name contains the id.
if (existingTableColumnIDMap != null && existingTableColumnIDMap
.containsKey(queryColumn.getColumnId())) {
String columnName = existingTableColumnIDMap.get(queryColumn.getColumnId());
if (columnName != null && !columnName.contains(queryColumn.getColumnId())) {
return true;
}
}
} else {
// In case of SDK the columnId and columnName is same and this check will ensure for
// all the child columns that the table column name is equal to query column name and
// table columnId is equal to table columnName
@@ -222,7 +257,7 @@ private static boolean isColumnMatchesStruct(CarbonColumn tableColumn, CarbonCol
return true;
}
if (carbonDimension.getListOfChildDimensions() != null) {
parentDimension = carbonDimension.getListOfChildDimensions();
childrenDimensions = carbonDimension.getListOfChildDimensions();
}
}
}
@@ -52,8 +52,8 @@ CarbonData DDL statements are documented here,which includes:
* [RENAME TABLE](#rename-table)
* [ADD COLUMNS](#add-columns)
* [DROP COLUMNS](#drop-columns)
* [RENAME COLUMN](#change-column-nametype)
* [CHANGE COLUMN NAME/TYPE/COMMENT](#change-column-nametype)
* [RENAME COLUMN](#change-column-name-and-type-and-comment)
* [CHANGE COLUMN NAME/TYPE/COMMENT](#change-column-name-and-type-and-comment)
* [MERGE INDEXES](#merge-index)
* [SET/UNSET](#set-and-unset)
* [DROP TABLE](#drop-table)
@@ -805,7 +805,7 @@ Users can specify which columns to include and exclude for local dictionary gene
2. If a column to be dropped has any Secondary index table created on them, drop column operation fails and the user will
be asked to drop the corresponding SI table first before going for actual drop.
- #### CHANGE COLUMN NAME/TYPE/COMMENT
- #### CHANGE COLUMN NAME AND TYPE AND COMMENT
This command is used to change column name and comment and the data type from INT to BIGINT or decimal precision from lower to higher.
Change of decimal data type from lower precision to higher precision will only be supported for cases where there is no data loss.
@@ -819,33 +819,47 @@ Users can specify which columns to include and exclude for local dictionary gene
- Invalid scenarios
* Change of decimal precision from (10,2) to (10,5) is invalid as in this case only scale is increased but total number of digits remains the same.
* Change the comment of the partition column
* Rename operation fails if the structure of the complex column has been altered. Please ensure the old and new columns are compatible with
each other. Meaning the number of children and complex levels should be unaltered while attempting to rename.
- Valid scenarios
* Change of decimal precision from (10,2) to (12,3) is valid as the total number of digits are increased by 2 but scale is increased only by 1 which will not lead to any data loss.
* Change the comment of columns other than partition column
- **NOTE:** The allowed range is 38,38 (precision, scale) and is a valid upper case scenario which is not resulting in data loss.
Example1:Change column a1's name to a2 and its data type from INT to BIGINT.
Example 1: Change column a1's name to a2 and its data type from INT to BIGINT.
```
ALTER TABLE test_db.carbon CHANGE a1 a2 BIGINT
```
Example2:Changing decimal precision of column a1 from 10 to 18.
Example 2: Changing decimal precision of column a1 from 10 to 18.
```
ALTER TABLE test_db.carbon CHANGE a1 a1 DECIMAL(18,2)
```
Example3:Change column a3's name to a4.
Example 3: Change column a3's name to a4.
```
ALTER TABLE test_db.carbon CHANGE a3 a4 STRING
```
Example3:Change column a3's comment to "col_comment".
Example 4: Change column a3's comment to "col_comment".
```
ALTER TABLE test_db.carbon CHANGE a3 a3 STRING COMMENT 'col_comment'
```
Example 5: Change child column name in column: structField struct\<age:int> from age to id.
```
ALTER TABLE test_db.carbon CHANGE structField structField struct<id:int>
```
Example 6: Change column name in column: oldArray array\<int> from oldArray to newArray.
```
ALTER TABLE test_db.carbon CHANGE oldArray newArray array<int>
```
**NOTE:** Once the column is renamed, user has to take care about replacing the fileheader with the new name or changing the column header in csv file.
@@ -866,6 +880,7 @@ Users can specify which columns to include and exclude for local dictionary gene
**NOTE:**
* Merge index is supported on streaming table from carbondata 2.0.1 version.
But streaming segments (ROW_V1) cannot create merge index.
* Rename column name is not supported for MAP type.
- #### SET and UNSET
@@ -47,13 +47,14 @@ class TestSIWithComplexArrayType extends QueryTest with BeforeAndAfterEach {

test("Test restructured array<string> and existing string column as index columns on SI with compaction") {
sql("drop table if exists complextable")
sql("create table complextable (id string, country array<string>, name string) stored as carbondata")
sql("create table complextable (id string, country array<string>, columnName string) stored as carbondata")
sql("insert into complextable select 1,array('china', 'us'), 'b'")
sql("insert into complextable select 2,array('pak'), 'v'")

sql("drop index if exists index_11 on complextable")
sql(
"ALTER TABLE complextable ADD COLUMNS(arr2 array<string>)")
sql("ALTER TABLE complextable ADD COLUMNS(newArray array<string>)")
sql("alter table complextable change newArray arr2 array<string>")
sql("alter table complextable change columnName name string")
sql("insert into complextable select 3,array('china'), 'f',array('hello','world')")
sql("insert into complextable select 4,array('India'),'g',array('iron','man','jarvis')")

@@ -93,9 +94,10 @@ class TestSIWithComplexArrayType extends QueryTest with BeforeAndAfterEach {
sql("insert into complextable select 2,array('pak'), 'v'")

sql("drop index if exists index_11 on complextable")
sql(
"ALTER TABLE complextable ADD COLUMNS(arr2 array<string>)")
sql("ALTER TABLE complextable ADD COLUMNS(addr string)")
sql("ALTER TABLE complextable ADD COLUMNS(newArray array<string>)")
sql("alter table complextable change newArray arr2 array<string>")
sql("ALTER TABLE complextable ADD COLUMNS(address string)")
sql("alter table complextable change address addr string")
sql("insert into complextable select 3,array('china'), 'f',array('hello','world'),'china'")
sql("insert into complextable select 4,array('India'),'g',array('iron','man','jarvis'),'India'")

@@ -129,11 +131,12 @@ class TestSIWithComplexArrayType extends QueryTest with BeforeAndAfterEach {
}

test("test array<string> on secondary index with compaction") {
sql("create table complextable (id string, country array<string>, name string) stored as carbondata")
sql("create table complextable (id string, columnCountry array<string>, name string) stored as carbondata")
sql("insert into complextable select 1,array('china', 'us'), 'b'")
sql("insert into complextable select 2,array('pak'), 'v'")
sql("insert into complextable select 3,array('china'), 'f'")
sql("insert into complextable select 4,array('india'),'g'")
sql("alter table complextable change columnCountry country array<string>")
val result1 = sql(" select * from complextable where array_contains(country,'china')")
val result2 = sql(" select * from complextable where country[0]='china'")
sql("drop index if exists index_1 on complextable")
@@ -162,11 +165,12 @@ class TestSIWithComplexArrayType extends QueryTest with BeforeAndAfterEach {
}

test("test array<string> and string as index columns on secondary index with compaction") {
sql("create table complextable (id string, country array<string>, name string) stored as carbondata")
sql("create table complextable (id string, columnCountry array<string>, name string) stored as carbondata")
sql("insert into complextable select 1, array('china', 'us'), 'b'")
sql("insert into complextable select 2, array('pak'), 'v'")
sql("insert into complextable select 3, array('china'), 'f'")
sql("insert into complextable select 4, array('india'),'g'")
sql("alter table complextable change columnCountry country array<string>")
val result = sql(" select * from complextable where array_contains(country,'china') and name='f'")
sql("drop index if exists index_1 on complextable")
sql("create index index_1 on table complextable(country, name) as 'carbondata'")
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.lib.Node
import org.apache.hadoop.hive.ql.parse._
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CarbonException

import org.apache.carbondata.common.constants.LoggerAction
@@ -1092,15 +1093,15 @@ object CarbonParserUtil {
private def appendParentForEachChild(field: Field, parentName: String): Field = {
field.dataType.getOrElse("NIL") match {
case "Array" | "Struct" | "Map" =>
val newChildren = field.children
.map(_.map(appendParentForEachChild(_, parentName + "." + field.column)))
field.copy(column = parentName + "." + field.column,
name = Some(parentName + "." + field.name.getOrElse(None)),
val newChildren = field.children.map(_.map(appendParentForEachChild(_,
parentName + CarbonCommonConstants.POINT + field.column)))
field.copy(column = parentName + CarbonCommonConstants.POINT + field.column,
name = Some(parentName + CarbonCommonConstants.POINT + field.name.getOrElse(None)),
children = newChildren,
parent = parentName)
case _ =>
field.copy(column = parentName + "." + field.column,
name = Some(parentName + "." + field.name.getOrElse(None)),
field.copy(column = parentName + CarbonCommonConstants.POINT + field.column,
name = Some(parentName + CarbonCommonConstants.POINT + field.name.getOrElse(None)),
parent = parentName)
}
}
@@ -1113,6 +1114,7 @@ object CarbonParserUtil {
* @return DataTypeInfo object with datatype, precision and scale
*/
def parseDataType(
columnName: String,
dataType: String,
values: Option[List[(Int, Int)]]): DataTypeInfo = {
var precision: Int = 0
@@ -1122,7 +1124,8 @@ object CarbonParserUtil {
if (values.isDefined) {
throw new MalformedCarbonCommandException("Invalid data type")
}
DataTypeInfo(DataTypeConverterUtil.convertToCarbonType(dataType).getName.toLowerCase)
DataTypeInfo(columnName,
DataTypeConverterUtil.convertToCarbonType(dataType).getName.toLowerCase)
case "decimal" =>
if (values.isDefined) {
precision = values.get(0)._1
@@ -1136,10 +1139,53 @@ object CarbonParserUtil {
} else if (scale < 0 || scale > 38) {
throw new MalformedCarbonCommandException("Invalid value for scale")
}
DataTypeInfo("decimal", precision, scale)
DataTypeInfo(columnName, "decimal", precision, scale)
case _ =>
DataTypeInfo(columnName,
DataTypeConverterUtil.convertToCarbonType(dataType).getName.toLowerCase)
}
}

/**
* This method will return the instantiated DataTypeInfo by parsing the column
*/
def parseColumn(columnName: String, dataType: DataType,
values: Option[List[(Int, Int)]]): DataTypeInfo = {
// creates parent dataTypeInfo first
val dataTypeName = DataTypeConverterUtil.convertToCarbonType(dataType.typeName).getName
val dataTypeInfo = CarbonParserUtil.parseDataType(columnName, dataTypeName.toLowerCase, values)
// check which child type is present and create children dataTypeInfo accordingly
dataType match {
case arrayType: ArrayType =>
val childType: DataType = arrayType.elementType
val childName = columnName + ".val"
val childValues = childType match {
case d: DecimalType => Some(List((d.precision, d.scale)))
case _ => None
}
val childDatatypeInfo = parseColumn(childName, childType, childValues)
dataTypeInfo.setChildren(List(childDatatypeInfo))
case structType: StructType =>
var childTypeInfoList: List[DataTypeInfo] = null
for (childField <- structType) {
val childType = childField.dataType
val childName = columnName + CarbonCommonConstants.POINT + childField.name
val childValues = childType match {
case d: DecimalType => Some(List((d.precision, d.scale)))
case _ => None
}
val childDatatypeInfo = CarbonParserUtil.parseColumn(childName, childType, childValues)
if (childTypeInfoList == null) {
childTypeInfoList = List(childDatatypeInfo)
} else {
childTypeInfoList = childTypeInfoList :+ childDatatypeInfo
}
}
dataTypeInfo.setChildren(childTypeInfoList)
case _ =>
DataTypeInfo(DataTypeConverterUtil.convertToCarbonType(dataType).getName.toLowerCase)
}
// TODO have to handle for map types [CARBONDATA-4199]
dataTypeInfo
}

def checkFieldDefaultValue(fieldName: String, defaultValueColumnName: String): Boolean = {
@@ -23,7 +23,6 @@ import java.util.UUID
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -160,7 +159,15 @@ case class DropPartitionCallableModel(carbonLoadModel: CarbonLoadModel,
carbonTable: CarbonTable,
sqlContext: SQLContext)

case class DataTypeInfo(dataType: String, precision: Int = 0, scale: Int = 0)
case class DataTypeInfo(columnName: String, dataType: String, precision: Int = 0, scale: Int = 0) {
private var children: Option[List[DataTypeInfo]] = None
def setChildren(childrenList: List[DataTypeInfo]): Unit = {
children = Some(childrenList)
}
def getChildren(): List[DataTypeInfo] = {
children.get
}
}

class AlterTableColumnRenameModel(columnName: String,
newColumnName: String,

0 comments on commit cfa02dd

Please sign in to comment.