Skip to content
Permalink
Browse files
[CARBONDATA-4298][CARBONDATA-4281] Empty bad record support for compl…
…ex type

Why is this PR needed?
1. IS_EMPTY_DATA_BAD_RECORD property not supported for complex types.
2. To update documentation that COLUMN_META_CACHE and RANGE_COLUMN
   doesn't support complex datatype

What changes were proposed in this PR?
1. Made changes to pass down IS_EMPTY_DATA_BAD_RECORD property and
   throw exception. Store empty complex type instead of storing
   null value which matches with hive table result.
2. Updated document and added testcase.

Does this PR introduce any user interface change?
No

Is any new testcase added?
Yes

This closes #4228
  • Loading branch information
ShreelekhyaG authored and Indhumathi27 committed Oct 21, 2021
1 parent b8d9a97 commit 305851ed75cf935c2a606071118dbe1347a18628
Showing 18 changed files with 196 additions and 64 deletions.
@@ -82,6 +82,7 @@
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.datatype.ArrayType;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypeAdapter;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -3538,13 +3539,18 @@ public static void updateNullValueBasedOnDatatype(DataOutputStream dataOutputStr
}
dataOutputStream.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
} else {
if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
} else {
dataOutputStream.writeShort(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
}
dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY);
updateWithEmptyValueBasedOnDatatype(dataOutputStream, dataType);
}
}

public static void updateWithEmptyValueBasedOnDatatype(DataOutputStream dataOutputStream,
DataType dataType) throws IOException {
if (DataTypeUtil.isByteArrayComplexChildColumn(dataType) || dataType instanceof ArrayType) {
dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
} else {
dataOutputStream.writeShort(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
}
dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY);
}

/**
@@ -318,7 +318,7 @@ CarbonData DDL statements are documented here,which includes:

- ##### Caching Min/Max Value for Required Columns

By default, CarbonData caches min and max values of all the columns in schema. As the load increases, the memory required to hold the min and max values increases considerably. This feature enables you to configure min and max values only for the required columns, resulting in optimized memory usage. This feature doesn't support binary data type.
By default, CarbonData caches min and max values of all the columns in schema. As the load increases, the memory required to hold the min and max values increases considerably. This feature enables you to configure min and max values only for the required columns, resulting in optimized memory usage. This feature doesn't support binary and complex data type.

Following are the valid values for COLUMN_META_CACHE:
* If you want no column min/max values to be cached in the driver.
@@ -507,7 +507,7 @@ CarbonData DDL statements are documented here,which includes:
- ##### Range Column
This property is used to specify a column to partition the input data by range.
Only one column can be configured. During data loading, you can use "global_sort_partitions" or "scale_factor" to avoid generating small files.
This feature doesn't support binary data type.
This feature doesn't support binary and complex data type.

```
TBLPROPERTIES('RANGE_COLUMN'='col1')
@@ -961,7 +961,7 @@ object CommonUtil {
.writeByteArray(result.asInstanceOf[ArrayObject],
dataOutputStream,
badRecordLogHolder,
true)
true, false)
dataOutputStream.close()
data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
case structType: StructType =>
@@ -973,7 +973,7 @@ object CommonUtil {
.writeByteArray(result.asInstanceOf[StructObject],
dataOutputStream,
badRecordLogHolder,
true)
true, false)
dataOutputStream.close()
data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
case other =>
@@ -0,0 +1,3 @@
1,109,4ROM size,Intel,29-11-2015,,MAC1:1,7:Chinese:Hubei Province:yichang:yichang:yichang$7:India:New Delhi:delhi:delhi:delhi,29-11-2015$29-11-2015:29-11-2015,109,2738.562,,
1,109,4ROM size,Intel,29-11-2015,1AA1$2,,7:Chinese:Hubei Province:yichang:yichang:yichang$7:India:New Delhi:delhi:delhi:delhi,29-11-2015$29-11-2015:29-11-2015,109,2738.562,,
1,109,4ROM size,Intel,29-11-2015,1AA1$2,MAC1:1,,29-11-2015$29-11-2015:29-11-2015,109,2738.562,,
@@ -1177,6 +1177,23 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists hive_table")
}

test("test COLUMN_META_CACHE and RANGE_COLUMN doesn't support complex datatype") {
sql("DROP TABLE IF EXISTS test")
var exception = intercept[Exception] {
sql("CREATE TABLE IF NOT EXISTS test " +
"(id INT,mlabel boolean,name STRING,arr1 array<array<int>>,autoLabel boolean)" +
" STORED AS carbondata TBLPROPERTIES('COLUMN_META_CACHE'='arr1')")
}
assert(exception.getMessage.contains("arr1 is a complex type column and complex type " +
"is not allowed for the option(s): column_meta_cache"))
exception = intercept[Exception] {
sql("CREATE TABLE IF NOT EXISTS test " +
"(id INT,label boolean,name STRING,map1 map<string, array<int>>,autoLabel boolean)" +
" STORED AS carbondata TBLPROPERTIES('RANGE_COLUMN'='map1')")
}
assert(exception.getMessage.contains("RANGE_COLUMN doesn't support map data type: map1"))
}

test("test when insert select from a parquet table " +
"with an struct with binary and custom complex delimiter") {
var carbonProperties = CarbonProperties.getInstance()
@@ -101,6 +101,52 @@ class BadRecordEmptyDataTest extends QueryTest with BeforeAndAfterAll {
}
}

def loadEmptyComplexData(isEmptyBadRecord: Boolean, badRecordsAction: String): Unit = {
sql(s"LOAD DATA local inpath '" + resourcesPath +
"/complextypeWithEmptyRecords.csv' INTO table complexcarbontable OPTIONS('DELIMITER'=','," +
"'QUOTECHAR'='\"', 'FILEHEADER'='deviceInformationId,channelsId,ROMSize,ROMName," +
"purchasedate,file,MAC,locationinfo,proddate,gamePointId,contractNumber,st,ar', " +
"'COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':', " +
s"'bad_records_logger_enable'='true','IS_EMPTY_DATA_BAD_RECORD'='$isEmptyBadRecord' ," +
s"'bad_records_action'='$badRecordsAction')")
}

test("Test complex type with empty values and IS_EMPTY_DATA_BAD_RECORD property") {
sql("DROP TABLE IF EXISTS complexcarbontable")
sql("DROP TABLE IF EXISTS complexhivetable")
sql(
"create table complexcarbontable(deviceInformationId int, channelsId string, ROMSize " +
"string, ROMName String, purchasedate string, file struct<school:array<string>, age:int>," +
" MAC map<string, int>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, " +
"ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>, " +
"proddate struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
"double,contractNumber double, st struct<school:struct<a:string,b:int>, age:int>," +
"ar array<array<string>>) STORED AS carbondata")
val exception = intercept[Exception] ( loadEmptyComplexData(true, "fail"))
assert(exception.getMessage.contains(
"The value with column name file.age and column data type INT is not a valid INT type."))
loadEmptyComplexData(true, "ignore")
checkAnswer(sql("select count(*) from complexcarbontable"), Seq(Row(0)))
loadEmptyComplexData(false, "ignore")
sql(
"create table complexhivetable(deviceInformationId int, channelsId " +
"string, ROMSize string, ROMName String, purchasedate string, file " +
"struct<school:array<string>, age:int>, MAC map<string, int>, " +
"locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, ActiveProvince:string, " +
"Activecity:string, ActiveDistrict:string, " +
"ActiveStreet:string>>, proddate struct<productionDate:string," +
"activeDeactivedate:array<string>>, gamePointId double,contractNumber double," +
"st struct<school:struct<a:string,b:int>, age:int>,ar array<array<string>>) row format " +
"delimited fields terminated by ',' collection items terminated by '$' map keys terminated " +
"by ':'")
sql("LOAD DATA local inpath '" + resourcesPath +
"/complextypeWithEmptyRecords.csv' INTO table complexhivetable")
checkAnswer(sql("select count(*) from complexcarbontable"), Seq(Row(3)))
checkAnswer(sql("select * from complexcarbontable"),
sql("select * from complexhivetable"))
sql("DROP TABLE IF EXISTS complexcarbontable")
}

test("select count(*) from empty_timestamp") {
checkAnswer(
sql("select count(*) from empty_timestamp"),
@@ -761,7 +761,7 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll {
// check one row of streaming data
assert(result(0).isNullAt(0))
assert(result(0).getString(1) == "")
assert(result(0).isNullAt(9))
assert(result(0).getStruct(9).isNullAt(1))
// check one row of batch loading
assert(result(50).getInt(0) == 100000001)
assert(result(50).getString(1) == "batch_1")
@@ -924,68 +924,68 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll {

checkAnswer(
sql("select * from stream_table_filter_complex where id is null order by name"),
Seq(Row(null, "", "", null, null, null, null, null, null, null),
Seq(Row(null, "", "", null, null, null, null, null, null, Row(mutable.WrappedArray.make(Array()), null)),
Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))

checkAnswer(
sql("select * from stream_table_filter_complex where name = ''"),
Seq(Row(null, "", "", null, null, null, null, null, null, null)))
Seq(Row(null, "", "", null, null, null, null, null, null, Row(mutable.WrappedArray.make(Array()), null))))

checkAnswer(
sql("select * from stream_table_filter_complex where id is null and name <> ''"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))

checkAnswer(
sql("select * from stream_table_filter_complex where city = ''"),
Seq(Row(null, "", "", null, null, null, null, null, null, null)))
Seq(Row(null, "", "", null, null, null, null, null, null, Row(mutable.WrappedArray.make(Array()), null))))

checkAnswer(
sql("select * from stream_table_filter_complex where id is null and city <> ''"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))

checkAnswer(
sql("select * from stream_table_filter_complex where salary is null"),
Seq(Row(null, "", "", null, null, null, null, null, null, null)))
Seq(Row(null, "", "", null, null, null, null, null, null, Row(mutable.WrappedArray.make(Array()), null))))

checkAnswer(
sql("select * from stream_table_filter_complex where id is null and salary is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))

checkAnswer(
sql("select * from stream_table_filter_complex where tax is null"),
Seq(Row(null, "", "", null, null, null, null, null, null, null)))
Seq(Row(null, "", "", null, null, null, null, null, null, Row(mutable.WrappedArray.make(Array()), null))))

checkAnswer(
sql("select * from stream_table_filter_complex where id is null and tax is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))

checkAnswer(
sql("select * from stream_table_filter_complex where percent is null"),
Seq(Row(null, "", "", null, null, null, null, null, null, null)))
Seq(Row(null, "", "", null, null, null, null, null, null, Row(mutable.WrappedArray.make(Array()), null))))

checkAnswer(
sql("select * from stream_table_filter_complex where id is null and salary is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))

checkAnswer(
sql("select * from stream_table_filter_complex where birthday is null"),
Seq(Row(null, "", "", null, null, null, null, null, null, null)))
Seq(Row(null, "", "", null, null, null, null, null, null, Row(mutable.WrappedArray.make(Array()), null))))

checkAnswer(
sql("select * from stream_table_filter_complex where id is null and birthday is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))

checkAnswer(
sql("select * from stream_table_filter_complex where register is null"),
Seq(Row(null, "", "", null, null, null, null, null, null, null)))
Seq(Row(null, "", "", null, null, null, null, null, null, Row(mutable.WrappedArray.make(Array()), null))))

checkAnswer(
sql("select * from stream_table_filter_complex where id is null and register is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))

checkAnswer(
sql("select * from stream_table_filter_complex where updated is null"),
Seq(Row(null, "", "", null, null, null, null, null, null, null)))
Seq(Row(null, "", "", null, null, null, null, null, null, Row(mutable.WrappedArray.make(Array()), null))))

checkAnswer(
sql("select * from stream_table_filter_complex where id is null and updated is not null"),
@@ -30,11 +30,12 @@
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
import org.apache.carbondata.processing.util.CarbonBadRecordUtil;

/**
* Array DataType stateless object used in data loading
*/
public class ArrayDataType implements GenericDataType<ArrayObject> {
public class ArrayDataType implements GenericDataType<Object> {

/**
* child columns
@@ -171,16 +172,27 @@ public boolean getIsColumnDictionary() {
}

@Override
public void writeByteArray(ArrayObject input, DataOutputStream dataOutputStream,
BadRecordLogHolder logHolder, Boolean isWithoutConverter) throws IOException {
public void writeByteArray(Object input, DataOutputStream dataOutputStream,
BadRecordLogHolder logHolder, Boolean isWithoutConverter, boolean isEmptyBadRecord)
throws IOException {
if (input == null) {
dataOutputStream.writeInt(1);
children.writeByteArray(null, dataOutputStream, logHolder, isWithoutConverter);
children.writeByteArray(null, dataOutputStream, logHolder, isWithoutConverter,
isEmptyBadRecord);
} else {
Object[] data = input.getData();
dataOutputStream.writeInt(data.length);
Object[] data = ((ArrayObject) input).getData();
if (data.length == 1 && data[0] != null
&& data[0].equals("") && !(children instanceof PrimitiveDataType)) {
// If child complex column is empty, no need to iterate. Fill empty byte array and return.
CarbonBadRecordUtil.updateEmptyValue(dataOutputStream, isEmptyBadRecord, logHolder,
parentName, DataTypeUtil.valueOf("array"));
return;
} else {
dataOutputStream.writeInt(data.length);
}
for (Object eachInput : data) {
children.writeByteArray(eachInput, dataOutputStream, logHolder, isWithoutConverter);
children.writeByteArray(eachInput, dataOutputStream, logHolder, isWithoutConverter,
isEmptyBadRecord);
}
}
}
@@ -268,7 +280,7 @@ public int getDataCounter() {
}

@Override
public GenericDataType<ArrayObject> deepCopy() {
public GenericDataType<Object> deepCopy() {
return new ArrayDataType(this.outputArrayIndex, this.dataCounter, this.children.deepCopy(),
this.name);
}
@@ -61,7 +61,7 @@
* @throws IOException
*/
void writeByteArray(T input, DataOutputStream dataOutputStream, BadRecordLogHolder logHolder,
Boolean isWithoutConverter)
Boolean isWithoutConverter, boolean isEmptyBadRecord)
throws IOException;

/**
@@ -40,7 +40,7 @@
import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
import org.apache.carbondata.processing.loading.converter.impl.binary.BinaryDecoder;
import org.apache.carbondata.processing.loading.dictionary.DirectDictionary;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
import org.apache.carbondata.processing.util.CarbonBadRecordUtil;

/**
* Primitive DataType stateless object used in data loading
@@ -237,14 +237,20 @@ public boolean getIsColumnDictionary() {

@Override
public void writeByteArray(Object input, DataOutputStream dataOutputStream,
BadRecordLogHolder logHolder, Boolean isWithoutConverter) throws IOException {
BadRecordLogHolder logHolder, Boolean isWithoutConverter, boolean isEmptyBadRecord)
throws IOException {
String parsedValue = null;
// write null value
if (null == input || ((this.carbonDimension.getDataType() == DataTypes.STRING
|| this.carbonDimension.getDataType() == DataTypes.VARCHAR) && input.equals(nullFormat))) {
updateNullValue(dataOutputStream, logHolder);
return;
}
if (input.equals("")) {
CarbonBadRecordUtil.updateEmptyValue(dataOutputStream, isEmptyBadRecord, logHolder,
carbonDimension.getColName(), this.carbonDimension.getDataType());
return;
}
// write null value after converter
if (!isWithoutConverter) {
parsedValue = DataTypeUtil.parseValue(input.toString(), carbonDimension);
@@ -415,13 +421,8 @@ private void updateValueToByteStream(DataOutputStream dataOutputStream, byte[] v
private void updateNullValue(DataOutputStream dataOutputStream, BadRecordLogHolder logHolder)
throws IOException {
CarbonUtil.updateNullValueBasedOnDatatype(dataOutputStream, this.carbonDimension.getDataType());
String message = logHolder.getColumnMessageMap().get(carbonDimension.getColName());
if (null == message) {
message = CarbonDataProcessorUtil
.prepareFailureReason(carbonDimension.getColName(), carbonDimension.getDataType());
logHolder.getColumnMessageMap().put(carbonDimension.getColName(), message);
}
logHolder.setReason(message);
CarbonBadRecordUtil.setErrorMessage(logHolder, carbonDimension.getColName(),
carbonDimension.getDataType().getName());
}

@Override

0 comments on commit 305851e

Please sign in to comment.