Skip to content

Commit

Permalink
[CARBONDATA-2606]Fix Complex array Pushdown and block auto merge comp…
Browse files Browse the repository at this point in the history
…action
  • Loading branch information
Indhumathi27 committed Jul 24, 2018
1 parent 878bbd8 commit f929cfd
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 26 deletions.
Expand Up @@ -27,6 +27,14 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
override def afterAll(): Unit = {
sql("DROP TABLE IF EXISTS table1")
sql("DROP TABLE IF EXISTS test")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
}

test("test Projection PushDown for Struct - Integer type") {
Expand Down Expand Up @@ -885,4 +893,98 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
checkExistence(sql("select * from table1"),true,"1.0E9")
}

test("test block compaction - auto merge") {
sql("DROP TABLE IF EXISTS table1")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
sql(
"create table table1 (roll int,person Struct<detail:int,age:string,height:double>) stored " +
"by 'carbondata'")
sql(
"load data inpath '" + resourcesPath +
"/Struct.csv' into table table1 options('delimiter'=','," +
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
"'complex_delimiter_level_2'='&')")
sql(
"load data inpath '" + resourcesPath +
"/Struct.csv' into table table1 options('delimiter'=','," +
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
"'complex_delimiter_level_2'='&')")
sql(
"load data inpath '" + resourcesPath +
"/Struct.csv' into table table1 options('delimiter'=','," +
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
"'complex_delimiter_level_2'='&')")
sql(
"load data inpath '" + resourcesPath +
"/Struct.csv' into table table1 options('delimiter'=','," +
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
"'complex_delimiter_level_2'='&')")
checkExistence(sql("show segments for table table1"),false, "Compacted")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
}

test("decimal with two level struct type") {
sql("DROP TABLE IF EXISTS test")
sql(
"create table test(id int,a struct<c:struct<d:decimal(20,10)>>) stored by 'carbondata' " +
"tblproperties('dictionary_include'='a')")
checkExistence(sql("desc test"),true,"struct<c:struct<d:decimal(20,10)>>")
checkExistence(sql("describe formatted test"),true,"struct<c:struct<d:decimal(20,10)>>")
sql("insert into test values(1,'3999.999')")
checkExistence(sql("select * from test"),true,"3999.9990000000")
}

test("test dictionary include for second struct and array column") {
sql("DROP TABLE IF EXISTS test")
sql(
"create table test(id int,a struct<b:int,c:int>, d struct<e:int,f:int>, d1 struct<e1:int," +
"f1:int>) stored by 'carbondata' tblproperties('dictionary_include'='d1')")
sql("insert into test values(1,'2$3','4$5','6$7')")
checkAnswer(sql("select * from test"),Seq(Row(1,Row(2,3),Row(4,5),Row(6,7))))
sql("DROP TABLE IF EXISTS test")
sql(
"create table test(a array<int>, b array<int>) stored by 'carbondata' tblproperties" +
"('dictionary_include'='b')")
sql("insert into test values(1,2) ")
checkAnswer(sql("select b[0] from test"),Seq(Row(2)))
}

test("date with struct and array") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
sql("DROP TABLE IF EXISTS test")
sql("create table test(a struct<b:date>) stored by 'carbondata'")
val exception1 = intercept[Exception] {
sql("insert into test select 'a' ")
}
assert(exception1.getMessage
.contains(
"Data load failed due to bad record: The value with column name a.b and column data type " +
"DATE is not a valid DATE type.Please enable bad record logger to know the detail reason."))
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
sql("DROP TABLE IF EXISTS test")
sql("create table test(a array<date>) stored by 'carbondata'")
val exception2 = intercept[Exception] {
sql("insert into test select 'a' ")
}
assert(exception2.getMessage
.contains(
"Data load failed due to bad record: The value with column name a.val and column data type " +
"DATE is not a valid DATE type.Please enable bad record logger to know the detail reason."))
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
"MM-dd-yyyy")
sql("DROP TABLE IF EXISTS test")
sql("create table test(a struct<d1:date,d2:date>) stored by 'carbondata'")
sql("insert into test values ('02-18-2012$12-9-2016')")
checkAnswer(sql("select * from test "), Row(Row(java.sql.Date.valueOf("2012-02-18"),java.sql.Date.valueOf("2016-12-09"))))
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
}
}
Expand Up @@ -128,7 +128,10 @@ private[spark] object SparkTypeConverter {
case "struct" => s"${
childDim.getColName.substring(dimName.length + 1)
}:struct<${ getStructChildren(table, childDim.getColName) }>"
case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }"
case dType => s"${
childDim.getColName
.substring(dimName.length + 1)
}:${ addDecimalScaleAndPrecision(childDim, dType) }"
}
}
}
Expand Up @@ -578,13 +578,19 @@ object CarbonDataRDDFactory {
if (carbonTable.isHivePartitionTable) {
carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
}
val compactedSegments = new util.ArrayList[String]()
handleSegmentMerging(sqlContext,
carbonLoadModel,
carbonTable,
compactedSegments,
operationContext)
carbonLoadModel.setMergedSegmentIds(compactedSegments)
// Block compaction for table containing complex datatype
if (carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
.exists(m => m.getDataType.isComplexType)) {
LOGGER.warn("Compaction is skipped as table contains complex columns")
} else {
val compactedSegments = new util.ArrayList[String]()
handleSegmentMerging(sqlContext,
carbonLoadModel,
carbonTable,
compactedSegments,
operationContext)
carbonLoadModel.setMergedSegmentIds(compactedSegments)
}
} catch {
case e: Exception =>
throw new Exception(
Expand Down
Expand Up @@ -97,7 +97,7 @@ case class CarbonDatasourceHadoopRelation(
breakable({
while (ifGetArrayItemExists.containsChild != null) {
if (ifGetArrayItemExists.childSchema.toString().contains("ArrayType")) {
arrayTypeExists = s.childSchema.toString().contains("ArrayType")
arrayTypeExists = ifGetArrayItemExists.childSchema.toString().contains("ArrayType")
break
}
if (ifGetArrayItemExists.child.isInstanceOf[AttributeReference]) {
Expand Down
Expand Up @@ -77,6 +77,7 @@ import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServicePr
import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
import org.apache.carbondata.spark.load.{CsvRDDHelper, DataLoadProcessorStepOnSpark}
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory.LOGGER
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl}

case class CarbonLoadDataCommand(
Expand Down Expand Up @@ -823,15 +824,21 @@ case class CarbonLoadDataCommand(
}
try {
carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
val compactedSegments = new util.ArrayList[String]()
// Trigger auto compaction
CarbonDataRDDFactory.handleSegmentMerging(
sparkSession.sqlContext,
carbonLoadModel,
table,
compactedSegments,
operationContext)
carbonLoadModel.setMergedSegmentIds(compactedSegments)
// Block compaction for table containing complex datatype
if (table.getTableInfo.getFactTable.getListOfColumns.asScala
.exists(m => m.getDataType.isComplexType)) {
LOGGER.warn("Compaction is skipped as table contains complex columns")
} else {
val compactedSegments = new util.ArrayList[String]()
// Trigger auto compaction
CarbonDataRDDFactory.handleSegmentMerging(
sparkSession.sqlContext,
carbonLoadModel,
table,
compactedSegments,
operationContext)
carbonLoadModel.setMergedSegmentIds(compactedSegments)
}
} catch {
case e: Exception =>
throw new Exception(
Expand Down
Expand Up @@ -172,8 +172,10 @@ public void setSurrogateIndex(int surrIndex) {

@Override
public void fillCardinality(List<Integer> dimCardWithComplex) {
dimCardWithComplex.add(0);
children.fillCardinality(dimCardWithComplex);
if (children.getIsColumnDictionary()) {
dimCardWithComplex.add(0);
children.fillCardinality(dimCardWithComplex);
}
}

@Override
Expand Down
Expand Up @@ -21,6 +21,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -173,7 +175,8 @@ public PrimitiveDataType(CarbonColumn carbonColumn, String parentName, String co
if (carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
|| carbonColumn.getDataType() == DataTypes.DATE) {
dictionaryGenerator = new DirectDictionary(DirectDictionaryKeyGeneratorFactory
.getDirectDictionaryGenerator(carbonDimension.getDataType()));
.getDirectDictionaryGenerator(carbonDimension.getDataType(),
getDateFormat(carbonDimension)));
isDirectDictionary = true;
} else if (carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
CacheProvider cacheProvider = CacheProvider.getInstance();
Expand Down Expand Up @@ -204,6 +207,25 @@ public PrimitiveDataType(CarbonColumn carbonColumn, String parentName, String co
}
}

/**
* get dateformat
* @param carbonDimension
* @return
*/
private String getDateFormat(CarbonDimension carbonDimension) {
String format;
String dateFormat = null;
if (this.carbonDimension.getDataType() == DataTypes.DATE) {
dateFormat = carbonDimension.getDateFormat();
}
if (dateFormat != null && !dateFormat.trim().isEmpty()) {
format = dateFormat;
} else {
format = CarbonUtil.getFormatFromProperty(dataType);
}
return format;
}

private boolean isDictionaryDimension(CarbonDimension carbonDimension) {
if (carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
return true;
Expand Down Expand Up @@ -326,6 +348,8 @@ public int getSurrogateIndex() {
byte[] value = null;
if (isDirectDictionary) {
int surrogateKey;
SimpleDateFormat parser = new SimpleDateFormat(getDateFormat(carbonDimension));
parser.parse(parsedValue);
// If the input is a long value then this means that logical type was provided by
// the user using AvroCarbonWriter. In this case directly generate surrogate key
// using dictionaryGenerator.
Expand Down Expand Up @@ -389,6 +413,8 @@ public int getSurrogateIndex() {
updateNullValue(dataOutputStream, logHolder);
} catch (CarbonDataLoadingException e) {
throw e;
} catch (ParseException ex) {
updateNullValue(dataOutputStream, logHolder);
} catch (Throwable ex) {
// TODO have to implemented the Bad Records LogHolder.
// Same like NonDictionaryFieldConverterImpl.
Expand Down
Expand Up @@ -178,9 +178,17 @@ public void setSurrogateIndex(int surrIndex) {

@Override
public void fillCardinality(List<Integer> dimCardWithComplex) {
dimCardWithComplex.add(0);
for (int i = 0; i < children.size(); i++) {
children.get(i).fillCardinality(dimCardWithComplex);
boolean isDictionaryColumn = false;
for (GenericDataType child : children) {
if (child.getIsColumnDictionary()) {
isDictionaryColumn = true;
}
}
if (isDictionaryColumn) {
dimCardWithComplex.add(0);
for (int i = 0; i < children.size(); i++) {
children.get(i).fillCardinality(dimCardWithComplex);
}
}
}

Expand Down
Expand Up @@ -165,7 +165,9 @@ private void setComplexMapSurrogateIndex(int dimensionCount) {
List<GenericDataType> primitiveTypes = new ArrayList<GenericDataType>();
complexDataType.getAllPrimitiveChildren(primitiveTypes);
for (GenericDataType eachPrimitive : primitiveTypes) {
eachPrimitive.setSurrogateIndex(surrIndex++);
if (eachPrimitive.getIsColumnDictionary()) {
eachPrimitive.setSurrogateIndex(surrIndex++);
}
}
} else {
surrIndex++;
Expand Down
Expand Up @@ -248,7 +248,9 @@ public static CarbonFactDataHandlerModel createCarbonFactDataHandlerModel(
List<GenericDataType> primitiveTypes = new ArrayList<GenericDataType>();
complexDataType.getValue().getAllPrimitiveChildren(primitiveTypes);
for (GenericDataType eachPrimitive : primitiveTypes) {
eachPrimitive.setSurrogateIndex(surrIndex++);
if (eachPrimitive.getIsColumnDictionary()) {
eachPrimitive.setSurrogateIndex(surrIndex++);
}
}
}

Expand Down

0 comments on commit f929cfd

Please sign in to comment.