Skip to content

Commit

Permalink
[CARBONDATA-2606]Fix Complex array Pushdown and block auto merge comp…
Browse files Browse the repository at this point in the history
…action
  • Loading branch information
Indhumathi27 committed Jul 24, 2018
1 parent bea277f commit d48eef0
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 25 deletions.
Expand Up @@ -27,6 +27,8 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
override def afterAll(): Unit = {
sql("DROP TABLE IF EXISTS table1")
sql("DROP TABLE IF EXISTS test")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
}

test("test Projection PushDown for Struct - Integer type") {
Expand Down Expand Up @@ -885,4 +887,62 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
checkExistence(sql("select * from table1"),true,"1.0E9")
}

test("test block compaction - auto merge") {
sql("DROP TABLE IF EXISTS table1")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
sql(
"create table table1 (roll int,person Struct<detail:int,age:string,height:double>) stored " +
"by 'carbondata'")
sql(
"load data inpath '" + resourcesPath +
"/Struct.csv' into table table1 options('delimiter'=','," +
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
"'complex_delimiter_level_2'='&')")
sql(
"load data inpath '" + resourcesPath +
"/Struct.csv' into table table1 options('delimiter'=','," +
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
"'complex_delimiter_level_2'='&')")
sql(
"load data inpath '" + resourcesPath +
"/Struct.csv' into table table1 options('delimiter'=','," +
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
"'complex_delimiter_level_2'='&')")
sql(
"load data inpath '" + resourcesPath +
"/Struct.csv' into table table1 options('delimiter'=','," +
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
"'complex_delimiter_level_2'='&')")
checkExistence(sql("show segments for table table1"),false, "Compacted")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
}

test("decimal with two level struct type") {
sql("DROP TABLE IF EXISTS test")
sql(
"create table test(id int,a struct<c:struct<d:decimal(20,10)>>) stored by 'carbondata' " +
"tblproperties('dictionary_include'='a')")
checkExistence(sql("desc test"),true,"struct<c:struct<d:decimal(20,10)>>")
checkExistence(sql("describe formatted test"),true,"struct<c:struct<d:decimal(20,10)>>")
sql("insert into test values(1,'3999.999')")
checkExistence(sql("select * from test"),true,"3999.9990000000")
}

test("test dictionary include for second struct and array column") {
sql("DROP TABLE IF EXISTS test")
sql(
"create table test(id int,a struct<b:int,c:int>, d struct<e:int,f:int>, d1 struct<e1:int," +
"f1:int>) stored by 'carbondata' tblproperties('dictionary_include'='d1')")
sql("insert into test values(1,'2$3','4$5','6$7')")
checkAnswer(sql("select * from test"),Seq(Row(1,Row(2,3),Row(4,5),Row(6,7))))
sql("DROP TABLE IF EXISTS test")
sql(
"create table test(a array<int>, b array<int>) stored by 'carbondata' tblproperties" +
"('dictionary_include'='b')")
sql("insert into test values(1,2) ")
checkAnswer(sql("select b[0] from test"),Seq(Row(2)))
}

}
Expand Up @@ -128,7 +128,10 @@ private[spark] object SparkTypeConverter {
case "struct" => s"${
childDim.getColName.substring(dimName.length + 1)
}:struct<${ getStructChildren(table, childDim.getColName) }>"
case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }"
case dType => s"${
childDim.getColName
.substring(dimName.length + 1)
}:${ addDecimalScaleAndPrecision(childDim, dType) }"
}
}
}
Expand Up @@ -578,13 +578,19 @@ object CarbonDataRDDFactory {
if (carbonTable.isHivePartitionTable) {
carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
}
val compactedSegments = new util.ArrayList[String]()
handleSegmentMerging(sqlContext,
carbonLoadModel,
carbonTable,
compactedSegments,
operationContext)
carbonLoadModel.setMergedSegmentIds(compactedSegments)
// Block compaction for table containing complex datatype
if (carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
.exists(m => m.getDataType.isComplexType)) {
LOGGER.warn("Compaction is skipped as table contains complex columns")
} else {
val compactedSegments = new util.ArrayList[String]()
handleSegmentMerging(sqlContext,
carbonLoadModel,
carbonTable,
compactedSegments,
operationContext)
carbonLoadModel.setMergedSegmentIds(compactedSegments)
}
} catch {
case e: Exception =>
throw new Exception(
Expand Down
Expand Up @@ -97,7 +97,7 @@ case class CarbonDatasourceHadoopRelation(
breakable({
while (ifGetArrayItemExists.containsChild != null) {
if (ifGetArrayItemExists.childSchema.toString().contains("ArrayType")) {
arrayTypeExists = s.childSchema.toString().contains("ArrayType")
arrayTypeExists = ifGetArrayItemExists.childSchema.toString().contains("ArrayType")
break
}
if (ifGetArrayItemExists.child.isInstanceOf[AttributeReference]) {
Expand Down
Expand Up @@ -77,6 +77,7 @@ import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServicePr
import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
import org.apache.carbondata.spark.load.{CsvRDDHelper, DataLoadProcessorStepOnSpark}
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory.LOGGER
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl}

case class CarbonLoadDataCommand(
Expand Down Expand Up @@ -823,15 +824,21 @@ case class CarbonLoadDataCommand(
}
try {
carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
val compactedSegments = new util.ArrayList[String]()
// Trigger auto compaction
CarbonDataRDDFactory.handleSegmentMerging(
sparkSession.sqlContext,
carbonLoadModel,
table,
compactedSegments,
operationContext)
carbonLoadModel.setMergedSegmentIds(compactedSegments)
// Block compaction for table containing complex datatype
if (table.getTableInfo.getFactTable.getListOfColumns.asScala
.exists(m => m.getDataType.isComplexType)) {
LOGGER.warn("Compaction is skipped as table contains complex columns")
} else {
val compactedSegments = new util.ArrayList[String]()
// Trigger auto compaction
CarbonDataRDDFactory.handleSegmentMerging(
sparkSession.sqlContext,
carbonLoadModel,
table,
compactedSegments,
operationContext)
carbonLoadModel.setMergedSegmentIds(compactedSegments)
}
} catch {
case e: Exception =>
throw new Exception(
Expand Down
Expand Up @@ -172,8 +172,10 @@ public void setSurrogateIndex(int surrIndex) {

@Override
public void fillCardinality(List<Integer> dimCardWithComplex) {
dimCardWithComplex.add(0);
children.fillCardinality(dimCardWithComplex);
if (children.getIsColumnDictionary()) {
dimCardWithComplex.add(0);
children.fillCardinality(dimCardWithComplex);
}
}

@Override
Expand Down
Expand Up @@ -178,9 +178,17 @@ public void setSurrogateIndex(int surrIndex) {

@Override
public void fillCardinality(List<Integer> dimCardWithComplex) {
dimCardWithComplex.add(0);
for (int i = 0; i < children.size(); i++) {
children.get(i).fillCardinality(dimCardWithComplex);
boolean isDictionaryColumn = false;
for (GenericDataType child : children) {
if (child.getIsColumnDictionary()) {
isDictionaryColumn = true;
}
}
if (isDictionaryColumn) {
dimCardWithComplex.add(0);
for (int i = 0; i < children.size(); i++) {
children.get(i).fillCardinality(dimCardWithComplex);
}
}
}

Expand Down
Expand Up @@ -165,7 +165,9 @@ private void setComplexMapSurrogateIndex(int dimensionCount) {
List<GenericDataType> primitiveTypes = new ArrayList<GenericDataType>();
complexDataType.getAllPrimitiveChildren(primitiveTypes);
for (GenericDataType eachPrimitive : primitiveTypes) {
eachPrimitive.setSurrogateIndex(surrIndex++);
if (eachPrimitive.getIsColumnDictionary()) {
eachPrimitive.setSurrogateIndex(surrIndex++);
}
}
} else {
surrIndex++;
Expand Down
Expand Up @@ -248,7 +248,9 @@ public static CarbonFactDataHandlerModel createCarbonFactDataHandlerModel(
List<GenericDataType> primitiveTypes = new ArrayList<GenericDataType>();
complexDataType.getValue().getAllPrimitiveChildren(primitiveTypes);
for (GenericDataType eachPrimitive : primitiveTypes) {
eachPrimitive.setSurrogateIndex(surrIndex++);
if (eachPrimitive.getIsColumnDictionary()) {
eachPrimitive.setSurrogateIndex(surrIndex++);
}
}
}

Expand Down

0 comments on commit d48eef0

Please sign in to comment.