Skip to content

Commit

Permalink
[CARBONDATA-3023] Alter add column issue with SORT_COLUMNS
Browse files Browse the repository at this point in the history
 Problem-1:
In case of ALTER ADD columns, the newly added column will be added to the schema at the last. But if that column is a SORT_COLUMN then while loading we expect all the SORT_COLUMNS to be at first.

Solution:
While getting the schema from the carbonTable, reshuffle/rearrange the schema with all the SORT_COLUMNS at the first.

Problem-2:
After ALTER DROP followed by ADD a new column to a partition table, LOAD is failing. In the load we are considering the dropped columns also.

Solution:
While loading the partition table take only the existing visible columns from the table. After DROP column, it becomes invisible. We should not considered the dropped columns while loading.

Problem-3:
(1) While checking for the null bitsets for the adaptive encoded primitive types, the null bitsets are based on the actual rowId. Now we are checking on the reverseInvertedIndex.
(2) In case of range filters @nu#LL will be removed in case of noDictionary Column for binary search. But now in the adaptive encoded page we are not using special null character for binary search.

Solution:
(1) The acutal rowId for checking nullBitSets should be taken from the invertedIndex.
(2) Dont remove @nu#LL0 values in case of adaptive encoded page.

This closes #2826
  • Loading branch information
dhatchayani authored and ravipesala committed Nov 21, 2018
1 parent ca3d056 commit de494c8
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,14 @@ public int compareTo(int rowId, byte[] compareValue) {
// rowId is the inverted index, but the null bitset is based on actual data
int nullBitSetRowId = rowId;
if (isExplicitSorted()) {
nullBitSetRowId = getInvertedReverseIndex(rowId);
nullBitSetRowId = getInvertedIndex(rowId);
}
byte[] nullBitSet = getNullBitSet(nullBitSetRowId, columnPage.getColumnSpec().getColumnType());
if (nullBitSet != null) {
// if this row is null, return default null represent in byte array
return ByteUtil.UnsafeComparer.INSTANCE.compareTo(nullBitSet, compareValue);
if (nullBitSet != null
&& ByteUtil.UnsafeComparer.INSTANCE.compareTo(nullBitSet, compareValue) == 0) {
// check if the compare value is a null value
// if the compare value is null and the data is also null we can directly return 0
return 0;
} else {
byte[] chunkData = this.getChunkDataInBytes(rowId);
return ByteUtil.UnsafeComparer.INSTANCE.compareTo(chunkData, compareValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ private BitSet setFilterdIndexToBitSetWithColumnIndex(

// Binary Search cannot be done on '@NU#LL$!", so need to check and compare for null on
// matching row.
if (dimensionColumnPage.isNoDicitionaryColumn()) {
if (dimensionColumnPage.isNoDicitionaryColumn() && !dimensionColumnPage.isAdaptiveEncoded()) {
updateForNoDictionaryColumn(startMin, endMax, dimensionColumnPage, bitSet);
}
return bitSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,4 +671,20 @@ object CarbonScalaUtil {
dataType == StringType
}

/**
* Rearrange the column schema with all the sort columns at first. In case of ALTER ADD COLUMNS,
* if the newly added column is a sort column it will be at the last. But we expects all the
* SORT_COLUMNS always at first
*
* @param columnSchemas
* @return
*/
def reArrangeColumnSchema(columnSchemas: mutable.Buffer[ColumnSchema]): mutable
.Buffer[ColumnSchema] = {
val newColumnSchemas = mutable.Buffer[ColumnSchema]()
newColumnSchemas ++= columnSchemas.filter(columnSchema => columnSchema.isSortColumn)
newColumnSchemas ++= columnSchemas.filterNot(columnSchema => columnSchema.isSortColumn)
newColumnSchemas
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ class AlterTableColumnSchemaGenerator(
val columnValidator = CarbonSparkFactory.getCarbonColumnValidator
columnValidator.validateColumns(allColumns)

allColumns = CarbonScalaUtil.reArrangeColumnSchema(allColumns)

def getLocalDictColumnList(tableProperties: scala.collection.mutable.Map[String, String],
columns: scala.collection.mutable.ListBuffer[ColumnSchema]): (scala.collection.mutable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,9 @@ case class CarbonLoadDataCommand(
CarbonSession.threadSet("partition.operationcontext", operationContext)
// input data from csv files. Convert to logical plan
val allCols = new ArrayBuffer[String]()
allCols ++= table.getAllDimensions.asScala.map(_.getColName)
allCols ++= table.getAllMeasures.asScala.map(_.getColName)
// get only the visible dimensions from table
allCols ++= table.getDimensionByTableName(table.getTableName).asScala.map(_.getColName)
allCols ++= table.getMeasureByTableName(table.getTableName).asScala.map(_.getColName)
var attributes =
StructType(
allCols.filterNot(_.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)).map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class AlterTableValidationTestCase extends Spark2QueryTest with BeforeAndAfterAl
sql("drop table if exists test")
sql("drop table if exists retructure_iud")
sql("drop table if exists restructure_random_select")
sql("drop table if exists alterTable")
sql("drop table if exists alterPartitionTable")

// clean data folder
CarbonProperties.getInstance()
Expand Down Expand Up @@ -738,6 +740,42 @@ test("test alter command for boolean data type with correct default measure valu
}
}

test("load table after alter drop column scenario") {
sql("drop table if exists alterTable")
sql(
"create table alterTable(empno string, salary string) stored by 'carbondata' tblproperties" +
"('sort_columns'='')")
sql("alter table alterTable drop columns(empno)")
sql("alter table alterTable add columns(empno string)")
sql(s"load data local inpath '$resourcesPath/double.csv' into table alterTable options" +
s"('header'='true')")
checkAnswer(sql("select salary from alterTable limit 1"), Row(" 775678765456789098765432.789"))
}

test("load partition table after alter drop column scenario") {
val timestampFormat = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT)
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
sql("drop table if exists alterPartitionTable")
sql(
"""
| CREATE TABLE alterPartitionTable (empname String, designation String, doj Timestamp,
| workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
| projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
| utilization int,salary int)
| PARTITIONED BY (empno int)
| STORED BY 'org.apache.carbondata.format'
| TBLPROPERTIES('SORT_COLUMNS'='empname,deptno,projectcode,projectjoindate,
| projectenddate,attendance')
""".stripMargin)
sql("alter table alterPartitionTable drop columns(projectenddate)")
sql("alter table alterPartitionTable add columns(projectenddate timestamp)")
sql(s"LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE alterPartitionTable OPTIONS('DELIMITER'= ',', " +
"'QUOTECHAR'= '\"')")
sql("select * from alterPartitionTable where empname='bill'").show(false)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timestampFormat)
}

override def afterAll {
sql("DROP TABLE IF EXISTS restructure")
sql("drop table if exists table1")
Expand All @@ -756,5 +794,7 @@ test("test alter command for boolean data type with correct default measure valu
sql("drop table if exists test")
sql("drop table if exists retructure_iud")
sql("drop table if exists restructure_random_select")
sql("drop table if exists alterTable")
sql("drop table if exists alterPartitionTable")
}
}

0 comments on commit de494c8

Please sign in to comment.