Skip to content

Commit

Permalink
[CARBONDATA-2606]Fix Complex array Pushdown and block auto merge comp…
Browse files Browse the repository at this point in the history
…action
  • Loading branch information
Indhumathi27 committed Jul 23, 2018
1 parent 43285bb commit 84f46b7
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 17 deletions.
Expand Up @@ -885,4 +885,36 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
checkExistence(sql("select * from table1"),true,"1.0E9")
}

test("test block compaction - auto merge") {
sql("DROP TABLE IF EXISTS table1")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
sql(
"create table table1 (roll int,person Struct<detail:int,age:string,height:double>) stored " +
"by 'carbondata'")
sql(
"load data inpath '" + resourcesPath +
"/Struct.csv' into table table1 options('delimiter'=','," +
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
"'complex_delimiter_level_2'='&')")
sql(
"load data inpath '" + resourcesPath +
"/Struct.csv' into table table1 options('delimiter'=','," +
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
"'complex_delimiter_level_2'='&')")
sql(
"load data inpath '" + resourcesPath +
"/Struct.csv' into table table1 options('delimiter'=','," +
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
"'complex_delimiter_level_2'='&')")
sql(
"load data inpath '" + resourcesPath +
"/Struct.csv' into table table1 options('delimiter'=','," +
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
"'complex_delimiter_level_2'='&')")
checkAnswer(sql("select count(*) from table1"),Seq(Row(40)))
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
}

}
Expand Up @@ -578,13 +578,19 @@ object CarbonDataRDDFactory {
if (carbonTable.isHivePartitionTable) {
carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
}
val compactedSegments = new util.ArrayList[String]()
handleSegmentMerging(sqlContext,
carbonLoadModel,
carbonTable,
compactedSegments,
operationContext)
carbonLoadModel.setMergedSegmentIds(compactedSegments)
// Block compaction for table containing complex datatype
if (carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
.exists(m => m.getDataType.isComplexType)) {
LOGGER.info("Compaction is skipped as table contains complex columns")
} else {
val compactedSegments = new util.ArrayList[String]()
handleSegmentMerging(sqlContext,
carbonLoadModel,
carbonTable,
compactedSegments,
operationContext)
carbonLoadModel.setMergedSegmentIds(compactedSegments)
}
} catch {
case e: Exception =>
throw new Exception(
Expand Down
Expand Up @@ -97,7 +97,7 @@ case class CarbonDatasourceHadoopRelation(
breakable({
while (ifGetArrayItemExists.containsChild != null) {
if (ifGetArrayItemExists.childSchema.toString().contains("ArrayType")) {
arrayTypeExists = s.childSchema.toString().contains("ArrayType")
arrayTypeExists = ifGetArrayItemExists.childSchema.toString().contains("ArrayType")
break
}
if (ifGetArrayItemExists.child.isInstanceOf[AttributeReference]) {
Expand Down
Expand Up @@ -77,6 +77,7 @@ import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServicePr
import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
import org.apache.carbondata.spark.load.{CsvRDDHelper, DataLoadProcessorStepOnSpark}
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory.LOGGER
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl}

case class CarbonLoadDataCommand(
Expand Down Expand Up @@ -823,15 +824,21 @@ case class CarbonLoadDataCommand(
}
try {
carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
val compactedSegments = new util.ArrayList[String]()
// Trigger auto compaction
CarbonDataRDDFactory.handleSegmentMerging(
sparkSession.sqlContext,
carbonLoadModel,
table,
compactedSegments,
operationContext)
carbonLoadModel.setMergedSegmentIds(compactedSegments)
// Block compaction for table containing complex datatype
if (table.getTableInfo.getFactTable.getListOfColumns.asScala
.exists(m => m.getDataType.isComplexType)) {
LOGGER.info("Compaction is skipped as table contains complex columns")
} else {
val compactedSegments = new util.ArrayList[String]()
// Trigger auto compaction
CarbonDataRDDFactory.handleSegmentMerging(
sparkSession.sqlContext,
carbonLoadModel,
table,
compactedSegments,
operationContext)
carbonLoadModel.setMergedSegmentIds(compactedSegments)
}
} catch {
case e: Exception =>
throw new Exception(
Expand Down

0 comments on commit 84f46b7

Please sign in to comment.