Skip to content

Commit

Permalink
[CARBONDATA-3101] Fixed dataload failure when a column is dropped and…
Browse files Browse the repository at this point in the history
… added in partition table

Problem: Data load is failing when a column in partition table is dropped and recreated because fieldConverters are created based on schemaOrdinal and this may be different than the order in which data is read from the csv.
For example --> columns = nonparcolumn string, parcolumn int
Now if the user drops and adds column the nonparcolumn then the fieldConverters would be created in the following order
fieldConverters[0] = MeasureFieldConverter
fieldConverters[1] = NonDictionaryFieldConverterImpl

The data read from the csv would be 'columndata', 1. The conversion for nonPartition column would fail i this case.

Solution: Add the partition column to the last while doing alter add column and creating fieldConverters.

This closes #2923
  • Loading branch information
kunal642 authored and ravipesala committed Nov 22, 2018
1 parent 79fc97a commit c5de10c
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 5 deletions.
Expand Up @@ -437,6 +437,20 @@ test("Creation of partition table should fail if the colname in table schema and
sql("drop datamap if exists preaggTable on table partitionTable")
}

test("validate data in partition table after dropping and adding a column") {
sql("drop table if exists par")
sql("create table par(name string) partitioned by (age double) stored by " +
"'carbondata'")
sql(s"load data local inpath '$resourcesPath/uniqwithoutheader.csv' into table par options" +
s"('header'='false')")
sql("alter table par drop columns(name)")
sql("alter table par add columns(name string)")
sql(s"load data local inpath '$resourcesPath/uniqwithoutheader.csv' into table par options" +
s"('header'='false')")
checkAnswer(sql("select name from par"), Seq(Row("a"),Row("b"), Row(null), Row(null)))
sql("drop table if exists par")
}


private def verifyPartitionInfo(frame: DataFrame, partitionNames: Seq[String]) = {
val plan = frame.queryExecution.sparkPlan
Expand Down Expand Up @@ -473,6 +487,7 @@ test("Creation of partition table should fail if the colname in table schema and
sql("drop table if exists staticpartitionlocloadother")
sql("drop table if exists staticpartitionextlocload_new")
sql("drop table if exists staticpartitionlocloadother_new")
sql("drop table if exists par")
}

}
Expand Up @@ -302,6 +302,11 @@ class AlterTableColumnSchemaGenerator(

allColumns = CarbonScalaUtil.reArrangeColumnSchema(allColumns)

if (tableInfo.getFactTable.getPartitionInfo != null) {
val par = tableInfo.getFactTable.getPartitionInfo.getColumnSchemaList
allColumns = allColumns.filterNot(b => par.contains(b)) ++= par.asScala
}

def getLocalDictColumnList(tableProperties: scala.collection.mutable.Map[String, String],
columns: scala.collection.mutable.ListBuffer[ColumnSchema]): (scala.collection.mutable
.ListBuffer[ColumnSchema], scala.collection.mutable.ListBuffer[ColumnSchema]) = {
Expand Down
Expand Up @@ -98,8 +98,15 @@ case class CarbonRelation(
override val output = {
val columns = carbonTable.getCreateOrderColumn(carbonTable.getTableName)
.asScala
val partitionColumnSchemas = if (carbonTable.getPartitionInfo() != null) {
carbonTable.getPartitionInfo.getColumnSchemaList.asScala
} else {
Nil
}
val otherColumns = columns.filterNot(a => partitionColumnSchemas.contains(a.getColumnSchema))
val partitionColumns = columns.filter(a => partitionColumnSchemas.contains(a.getColumnSchema))
// convert each column to Attribute
columns.filter(!_.isInvisible).map { column: CarbonColumn =>
(otherColumns ++= partitionColumns).filter(!_.isInvisible).map { column: CarbonColumn =>
if (column.isDimension()) {
val output: DataType = column.getDataType.getName.toLowerCase match {
case "array" =>
Expand Down
Expand Up @@ -206,11 +206,18 @@ public void build(
} else {
if (StringUtils.isEmpty(fileHeader)) {
List<CarbonColumn> columns = table.getCreateOrderColumn(table.getTableName());
String[] columnNames = new String[columns.size()];
for (int i = 0; i < columnNames.length; i++) {
columnNames[i] = columns.get(i).getColName();
List<String> columnNames = new ArrayList<>();
List<String> partitionColumns = new ArrayList<>();
for (int i = 0; i < columns.size(); i++) {
if (table.getPartitionInfo() != null && table.getPartitionInfo().getColumnSchemaList()
.contains(columns.get(i).getColumnSchema())) {
partitionColumns.add(columns.get(i).getColName());
} else {
columnNames.add(columns.get(i).getColName());
}
}
fileHeader = Strings.mkString(columnNames, ",");
columnNames.addAll(partitionColumns);
fileHeader = Strings.mkString(columnNames.toArray(new String[columnNames.size()]), ",");
}
}
}
Expand Down

0 comments on commit c5de10c

Please sign in to comment.