From b95d7b36142dbbc201c90fe2b0de03ea1d6f58f7 Mon Sep 17 00:00:00 2001 From: QiangCai Date: Sun, 5 May 2019 15:22:23 +0800 Subject: [PATCH 1/3] fix compaction issue for sort_columns modification --- .../SegmentPropertiesAndSchemaHolder.java | 13 +-- .../executor/impl/AbstractQueryExecutor.java | 6 +- .../core/scan/executor/util/QueryUtil.java | 2 +- .../result/iterator/RawResultIterator.java | 86 ++++++++++++++++- .../core/scan/wrappers/ByteArrayWrapper.java | 3 + .../TestAlterTableSortColumnsProperty.scala | 92 +++++++++++++------ .../spark/rdd/StreamHandoffRDD.scala | 2 +- .../sql/test/Spark2TestQueryExecutor.scala | 1 + .../merger/CarbonCompactionExecutor.java | 9 +- 9 files changed, 165 insertions(+), 49 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java index 34ce5d01dec..f2f2d8cfc06 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java @@ -346,15 +346,9 @@ private boolean checkColumnSchemaEquality(List obj1, List clonedObj1 = new ArrayList<>(obj1); - List clonedObj2 = new ArrayList<>(obj2); - clonedObj1.addAll(obj1); - clonedObj2.addAll(obj2); - sortList(clonedObj1); - sortList(clonedObj2); boolean exists = true; for (int i = 0; i < obj1.size(); i++) { - if (!clonedObj1.get(i).equalsWithStrictCheck(clonedObj2.get(i))) { + if (!obj1.get(i).equalsWithStrictCheck(obj2.get(i))) { exists = false; break; } @@ -372,11 +366,14 @@ private void sortList(List columnSchemas) { @Override public int hashCode() { int allColumnsHashCode = 0; + // check column order + StringBuilder builder = new StringBuilder(); for (ColumnSchema columnSchema: columnsInTable) { allColumnsHashCode = allColumnsHashCode + columnSchema.strictHashCode(); + builder.append(columnSchema.getColumnUniqueId()).append(","); } return carbonTable.getAbsoluteTableIdentifier().hashCode() + allColumnsHashCode + Arrays - .hashCode(columnCardinality); + .hashCode(columnCardinality) + builder.toString().hashCode(); } public AbsoluteTableIdentifier getTableIdentifier() { diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java index f06f5c34f9f..6c048f3df15 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java @@ -605,7 +605,7 @@ private BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel, // setting the size of fixed key column (dictionary column) blockExecutionInfo .setFixedLengthKeySize(getKeySize(projectDimensions, segmentProperties)); - Set dictionaryColumnChunkIndex = new HashSet(); + List dictionaryColumnChunkIndex = new ArrayList(); List noDictionaryColumnChunkIndex = new ArrayList(); // get the block index to be read from file for query dimension // for both dictionary columns and no dictionary columns @@ -616,7 +616,9 @@ private BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel, dictionaryColumnChunkIndex.toArray(new Integer[dictionaryColumnChunkIndex.size()])); // need to sort the dictionary column as for all dimension // column key will be filled based on key order - Arrays.sort(queryDictionaryColumnChunkIndexes); + if (!queryModel.isForcedDetailRawQuery()) { + Arrays.sort(queryDictionaryColumnChunkIndexes); + } blockExecutionInfo.setDictionaryColumnChunkIndex(queryDictionaryColumnChunkIndexes); // setting the no dictionary column block indexes blockExecutionInfo.setNoDictionaryColumnChunkIndexes(ArrayUtils.toPrimitive( diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java index 49157f95cd7..95fbe662156 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java @@ -509,7 +509,7 @@ public static byte[] getMaskedKey(byte[] data, byte[] maxKey, int[] maskByteRang public static void fillQueryDimensionChunkIndexes( List projectDimensions, Map columnOrdinalToChunkIndexMapping, - Set dictionaryDimensionChunkIndex, + List dictionaryDimensionChunkIndex, List noDictionaryDimensionChunkIndex) { for (ProjectionDimension queryDimension : projectDimensions) { if (CarbonUtil.hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.DICTIONARY) diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java index 1febb0beef2..7cf30711ac3 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java @@ -28,9 +28,15 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.keygenerator.KeyGenException; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.scan.executor.util.RestructureUtil; import org.apache.carbondata.core.scan.result.RowBatch; import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.log4j.Logger; @@ -58,6 +64,15 @@ public class RawResultIterator extends CarbonIterator { private Object[] currentRawRow = null; private boolean isBackupFilled = false; + // column reorder + private int noDictCount; + private int[] noDictMap; + private final int measureCount; + // column drift + private final boolean hasColumnDrift; + private boolean[] isColumnDrift; + private DataType[] measureDataTypes; + /** * LOGGER */ @@ -66,18 +81,59 @@ public class RawResultIterator extends CarbonIterator { public RawResultIterator(CarbonIterator detailRawQueryResultIterator, SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties, - boolean isStreamingHandoff) { + boolean isStreamingHandoff, boolean hasColumnDrift) { this.detailRawQueryResultIterator = detailRawQueryResultIterator; this.sourceSegProperties = sourceSegProperties; this.destinationSegProperties = destinationSegProperties; this.executorService = Executors.newFixedThreadPool(1); - + this.hasColumnDrift = hasColumnDrift; + this.measureCount = destinationSegProperties.getMeasures().size(); if (!isStreamingHandoff) { init(); } } + private void initForColumnReorder() { + List noDictDims = + new ArrayList<>(destinationSegProperties.getDimensions().size()); + for (CarbonDimension dimension : destinationSegProperties.getDimensions()) { + if (dimension.getNumberOfChild() == 0) { + if (!dimension.hasEncoding(Encoding.DICTIONARY)) { + noDictDims.add(dimension); + } + } + } + noDictCount = noDictDims.size(); + isColumnDrift = new boolean[noDictCount]; + noDictMap = new int[noDictCount]; + measureDataTypes = new DataType[noDictCount]; + List sourceMeasures = sourceSegProperties.getMeasures(); + int tableMeasureCount = sourceMeasures.size(); + for (int i = 0; i < noDictCount; i++) { + for (int j = 0; j < tableMeasureCount; j++) { + if (RestructureUtil.isColumnMatches(true, noDictDims.get(i), sourceMeasures.get(j))) { + isColumnDrift[i] = true; + measureDataTypes[i] = sourceMeasures.get(j).getDataType(); + break; + } + } + if (measureDataTypes[i] == null) { + isColumnDrift[i] = false; + } + } + int noDictIndex = 0; + int measureIndex = measureCount + 1; + for(int i = 0; i< noDictCount; i++) { + if (isColumnDrift[i]) { + noDictMap[i] = measureIndex++; + } else { + noDictMap[i] = noDictIndex++; + } + } + } + private void init() { + initForColumnReorder(); this.prefetchEnabled = CarbonProperties.getInstance().getProperty( CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE, CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true"); @@ -194,11 +250,33 @@ public Object[] fetchConverted() throws KeyGenException { } private Object[] convertRow(Object[] rawRow) throws KeyGenException { - byte[] dims = ((ByteArrayWrapper) rawRow[0]).getDictionaryKey(); + ByteArrayWrapper dimObject = (ByteArrayWrapper) rawRow[0]; + byte[] dims = dimObject.getDictionaryKey(); long[] keyArray = sourceSegProperties.getDimensionKeyGenerator().getKeyArray(dims); byte[] covertedBytes = destinationSegProperties.getDimensionKeyGenerator().generateKey(keyArray); - ((ByteArrayWrapper) rawRow[0]).setDictionaryKey(covertedBytes); + dimObject.setDictionaryKey(covertedBytes); + if (hasColumnDrift) { + byte[][] noDicts = dimObject.getNoDictionaryKeys(); + byte[][] newNoDicts = new byte[noDictCount][]; + for(int i = 0; i < noDictCount; i++) { + if (isColumnDrift[i]) { + newNoDicts[i] = DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn( + rawRow[noDictMap[i]], measureDataTypes[i]); + } else { + newNoDicts[i] = noDicts[noDictMap[i]]; + } + } + ByteArrayWrapper newWrapper = new ByteArrayWrapper(); + newWrapper.setDictionaryKey(covertedBytes); + newWrapper.setNoDictionaryKeys(newNoDicts); + newWrapper.setComplexTypesKeys(dimObject.getComplexTypesKeys()); + newWrapper.setImplicitColumnByteArray(dimObject.getImplicitColumnByteArray()); + Object[] finalRawRow = new Object[1 + measureCount]; + finalRawRow[0] = newWrapper; + System.arraycopy(rawRow, 1, finalRawRow, 1, measureCount); + return finalRawRow; + } return rawRow; } diff --git a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java index 1b903f703fa..253c21c2cf3 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java @@ -218,4 +218,7 @@ public void setImplicitColumnByteArray(byte[] implicitColumnByteArray) { this.implicitColumnByteArray = implicitColumnByteArray; } + public byte[] getImplicitColumnByteArray() { + return implicitColumnByteArray; + } } diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala index bf4bae6cda0..3e23e91dfcd 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala @@ -32,7 +32,7 @@ class TestAlterTableSortColumnsProperty extends QueryTest with BeforeAndAfterAll CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss") CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true") + .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false") dropTable() prepareTable() } @@ -359,6 +359,11 @@ class TestAlterTableSortColumnsProperty extends QueryTest with BeforeAndAfterAll loadData(tableName, baseTableName) checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) + // alter table to change SORT_SCOPE and SORT_COLUMNS + sql(s"alter table $tableName set tblproperties('sort_scope'='global_sort', 'sort_columns'='charField')") + loadData(tableName, baseTableName) + checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) + // alter table to local_sort with new SORT_COLUMNS sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='timestampField, intField, stringField')") loadData(tableName, baseTableName) @@ -370,40 +375,66 @@ class TestAlterTableSortColumnsProperty extends QueryTest with BeforeAndAfterAll checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) // alter table to change SORT_COLUMNS - sql(s"alter table $tableName set tblproperties('sort_columns'='smallIntField, stringField, intField')") - loadData(tableName, baseTableName) - checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) - - // alter table to change SORT_SCOPE and SORT_COLUMNS - sql(s"alter table $tableName set tblproperties('sort_scope'='global_sort', 'sort_columns'='charField, bigIntField, smallIntField')") + sql(s"alter table $tableName set tblproperties('sort_columns'='intField, stringField')") loadData(tableName, baseTableName) checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) // alter table to change SORT_SCOPE - sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='charField, bigIntField, smallIntField')") + sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='charField, smallIntField')") loadData(tableName, baseTableName) checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) + // set input segments + (0 to 5).foreach { segment => + sql(s"set carbon.input.segments.default.$tableName=$segment").collect() + sql(s"set carbon.input.segments.default.$baseTableName=$segment").collect() + checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName")) + checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) + checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField")) + checkAnswer(sql(s"select * from $tableName where intField >= 2 order by floatField"), sql(s"select * from $baseTableName where intField >= 2 order by floatField")) + checkAnswer(sql(s"select * from $tableName where smallIntField = 2 or intField >= 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 or intField >= 2 order by floatField")) + } + sql(s"set carbon.input.segments.default.$tableName=*").collect() + sql(s"set carbon.input.segments.default.$baseTableName=*").collect() + // query checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName")) + checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField")) checkAnswer(sql(s"select * from $tableName where intField >= 2 order by floatField"), sql(s"select * from $baseTableName where intField >= 2 order by floatField")) checkAnswer(sql(s"select * from $tableName where smallIntField = 2 or intField >= 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 or intField >= 2 order by floatField")) - // set input segments - (0 to 5).foreach { segment => - sql(s"set carbon.input.segments.default.$tableName=$segment").show(100, false) - sql(s"set carbon.input.segments.default.$baseTableName=$segment").show(100, false) - checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField")) - } - sql(s"set carbon.input.segments.default.$tableName=*").show(100, false) - sql(s"set carbon.input.segments.default.$baseTableName=*").show(100, false) - // delete sql(s"delete from $tableName where smallIntField = 2") sql(s"delete from $baseTableName where smallIntField = 2") checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) + // compaction for column drift + sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='charField, intField')") + // [Segment info]: + // | sorted | dimension order(sort_columns is in []) | measure order + // ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + // 0 | false | timestampField, dateField, stringField, varcharField, charField | smallIntField, intField, bigIntField, floatField, doubleField + // 1 | true | [charField], timestampField, dateField, stringField, varcharField | smallIntField, intField, bigIntField, floatField, doubleField + // 2 | false | [timestampField, intField, stringField], charField, dateField, varcharField | smallIntField, bigIntField, floatField, doubleField + // 3 | false | [stringField, intField, timestampField], charField, dateField, varcharField | smallIntField, bigIntField, floatField, doubleField + // 4 | false | [intField, stringField], timestampField, charField, dateField, varcharField | smallIntField, bigIntField, floatField, doubleField + // 5 | true | [charField, smallIntField], intField, stringField, timestampField, dateField, varcharField | bigIntField, floatField, doubleField + // Column drift happened, intField and smallIntField became dimension. + // The order of columns also changed. + // + // [Table info]: + // | dimension order(sort_columns is in []) | measure order + // -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + // table | [charField], smallIntField, intField, stringField, timestampField, dateField, varcharField | bigIntField, floatField, doubleField + sql(s"alter table $tableName compact 'minor'") + sql(s"alter table $baseTableName compact 'minor'") + checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName")) + checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) + checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField")) + checkAnswer(sql(s"select * from $tableName where intField >= 2 order by floatField"), sql(s"select * from $baseTableName where intField >= 2 order by floatField")) + checkAnswer(sql(s"select * from $tableName where smallIntField = 2 or intField >= 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 or intField >= 2 order by floatField")) + sql(s"delete from $tableName") checkAnswer(sql(s"select count(*) from $tableName"), Seq(Row(0))) sql(s"delete from $baseTableName") @@ -426,8 +457,8 @@ class TestAlterTableSortColumnsProperty extends QueryTest with BeforeAndAfterAll checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) // update - sql(s"update $tableName set (smallIntField, intField, bigIntField, floatField, doubleField) = (smallIntField + 3, intField + 3, bigIntField + 3, floatField + 3, doubleField + 3) where smallIntField = 2").show() - sql(s"update $baseTableName set (smallIntField, intField, bigIntField, floatField, doubleField) = (smallIntField + 3, intField + 3, bigIntField + 3, floatField + 3, doubleField + 3) where smallIntField = 2").show() + sql(s"update $tableName set (smallIntField, intField, bigIntField, floatField, doubleField) = (smallIntField + 3, intField + 3, bigIntField + 3, floatField + 3, doubleField + 3) where smallIntField = 2").collect() + sql(s"update $baseTableName set (smallIntField, intField, bigIntField, floatField, doubleField) = (smallIntField + 3, intField + 3, bigIntField + 3, floatField + 3, doubleField + 3) where smallIntField = 2").collect() checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) // query @@ -438,21 +469,20 @@ class TestAlterTableSortColumnsProperty extends QueryTest with BeforeAndAfterAll // set input segments (6 to 11).foreach { segment => - sql(s"set carbon.input.segments.default.$tableName=$segment").show(100, false) - sql(s"set carbon.input.segments.default.$baseTableName=$segment").show(100, false) + sql(s"set carbon.input.segments.default.$tableName=$segment").collect() + sql(s"set carbon.input.segments.default.$baseTableName=$segment").collect() checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField")) } - sql(s"set carbon.input.segments.default.$tableName=*").show(100, false) - sql(s"set carbon.input.segments.default.$baseTableName=*").show(100, false) + sql(s"set carbon.input.segments.default.$tableName=*").collect() + sql(s"set carbon.input.segments.default.$baseTableName=*").collect() - // compaction - sql(s"show segments for table $tableName").show(100, false) - sql(s"show segments for table $baseTableName").show(100, false) + // no_sort compaction flow for column drift + sql(s"alter table $tableName set tblproperties('sort_scope'='no_sort', 'sort_columns'='charField, intField')") + // sort_scope become no_sort sql(s"alter table $tableName compact 'minor'") sql(s"alter table $baseTableName compact 'minor'") - sql(s"show segments for table $tableName").show(100, false) - sql(s"show segments for table $baseTableName").show(100, false) checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName")) + checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField")) checkAnswer(sql(s"select * from $tableName where intField >= 2 order by floatField"), sql(s"select * from $baseTableName where intField >= 2 order by floatField")) checkAnswer(sql(s"select * from $tableName where smallIntField = 2 or intField >= 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 or intField >= 2 order by floatField")) @@ -508,6 +538,8 @@ class TestAlterTableSortColumnsProperty extends QueryTest with BeforeAndAfterAll } test("bloom filter") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true") val tableName = "alter_sc_bloom" val dataMapName = "alter_sc_bloom_dm1" val baseTableName = "alter_sc_bloom_base" @@ -523,18 +555,18 @@ class TestAlterTableSortColumnsProperty extends QueryTest with BeforeAndAfterAll } test("pre-aggregate") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true") val tableName = "alter_sc_agg" val dataMapName = "alter_sc_agg_dm1" val baseTableName = "alter_sc_agg_base" loadData(tableName, baseTableName) - sql(s"SHOW DATAMAP ON TABLE $tableName").show(100, false) checkExistence(sql(s"SHOW DATAMAP ON TABLE $tableName"), true, "preaggregate", dataMapName) checkExistence(sql(s"EXPLAIN select stringField,sum(intField) as sum from $tableName where stringField = 'abc2' group by stringField"), true, "preaggregate", dataMapName) checkAnswer(sql(s"select stringField,sum(intField) as sum from $tableName where stringField = 'abc2' group by stringField"), sql(s"select stringField,sum(intField) as sum from $baseTableName where stringField = 'abc2' group by stringField")) sql(s"alter table $tableName set tblproperties('sort_scope'='global_sort', 'sort_columns'='smallIntField, charField')") loadData(tableName, baseTableName) - sql(s"EXPLAIN select stringField,max(intField) as sum from $tableName where stringField = 'abc2' group by stringField").show(100, false) checkExistence(sql(s"EXPLAIN select stringField,max(intField) as sum from $tableName where stringField = 'abc2' group by stringField"), true, "preaggregate", dataMapName) checkAnswer(sql(s"select stringField,max(intField) as sum from $tableName where stringField = 'abc2' group by stringField"), sql(s"select stringField,max(intField) as sum from $baseTableName where stringField = 'abc2' group by stringField")) } diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala index 31417bcc3f0..b593f5664b5 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala @@ -74,7 +74,7 @@ class HandoffPartition( */ class StreamingRawResultIterator( recordReader: RecordReader[Void, Any] -) extends RawResultIterator(null, null, null, true) { +) extends RawResultIterator(null, null, null, true, false) { override def hasNext: Boolean = { recordReader.nextKeyValue() diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala index 072971321ff..b212a32f7ed 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala @@ -72,6 +72,7 @@ object Spark2TestQueryExecutor { .enableHiveSupport() .config("spark.sql.warehouse.dir", warehouse) .config("spark.sql.crossJoin.enabled", "true") + .config("spark.network.timeout", 10000000) .getOrCreateCarbonSession(null, TestQueryExecutor.metaStoreDB) if (warehouse.startsWith("hdfs://")) { System.setProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION, warehouse) diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java index 619b45ab723..6d99f640ec6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java @@ -37,6 +37,7 @@ import org.apache.carbondata.core.scan.executor.QueryExecutor; import org.apache.carbondata.core.scan.executor.QueryExecutorFactory; import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; +import org.apache.carbondata.core.scan.executor.util.RestructureUtil; import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.scan.model.QueryModelBuilder; import org.apache.carbondata.core.scan.result.RowBatch; @@ -167,11 +168,13 @@ public Map> processTableBlocks(Configuration con private RawResultIterator getRawResultIterator(Configuration configuration, String segmentId, String task, List tableBlockInfoList) throws QueryExecutionException, IOException { + SegmentProperties sourceSegmentProperties = getSourceSegmentProperties( + Collections.singletonList(tableBlockInfoList.get(0).getDataFileFooter())); + boolean hasColumnDrift = carbonTable.hasColumnDrift() && + RestructureUtil.hasColumnDriftOnSegment(carbonTable, sourceSegmentProperties); return new RawResultIterator( executeBlockList(tableBlockInfoList, segmentId, task, configuration), - getSourceSegmentProperties( - Collections.singletonList(tableBlockInfoList.get(0).getDataFileFooter())), - destinationSegProperties, false); + sourceSegmentProperties, destinationSegProperties, false, hasColumnDrift); } /** From d7fddd51ff780dfc8cd1b5cbf2b152021eaaa59c Mon Sep 17 00:00:00 2001 From: QiangCai Date: Sun, 5 May 2019 15:34:54 +0800 Subject: [PATCH 2/3] self review --- .../result/iterator/RawResultIterator.java | 20 +++++++++++-------- .../sql/test/Spark2TestQueryExecutor.scala | 1 - 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java index 7cf30711ac3..6a4b741eafc 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java @@ -64,13 +64,13 @@ public class RawResultIterator extends CarbonIterator { private Object[] currentRawRow = null; private boolean isBackupFilled = false; - // column reorder + // column reorder for no-dictionary column private int noDictCount; private int[] noDictMap; - private final int measureCount; // column drift private final boolean hasColumnDrift; private boolean[] isColumnDrift; + private final int measureCount; private DataType[] measureDataTypes; /** @@ -93,7 +93,7 @@ public RawResultIterator(CarbonIterator detailRawQueryResultIterator, } } - private void initForColumnReorder() { + private void initForColumnDrift() { List noDictDims = new ArrayList<>(destinationSegProperties.getDimensions().size()); for (CarbonDimension dimension : destinationSegProperties.getDimensions()) { @@ -122,8 +122,9 @@ private void initForColumnReorder() { } } int noDictIndex = 0; + // the column drift are at the end of measures int measureIndex = measureCount + 1; - for(int i = 0; i< noDictCount; i++) { + for (int i = 0; i < noDictCount; i++) { if (isColumnDrift[i]) { noDictMap[i] = measureIndex++; } else { @@ -133,7 +134,9 @@ private void initForColumnReorder() { } private void init() { - initForColumnReorder(); + if (hasColumnDrift) { + initForColumnDrift(); + } this.prefetchEnabled = CarbonProperties.getInstance().getProperty( CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE, CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true"); @@ -257,12 +260,13 @@ private Object[] convertRow(Object[] rawRow) throws KeyGenException { destinationSegProperties.getDimensionKeyGenerator().generateKey(keyArray); dimObject.setDictionaryKey(covertedBytes); if (hasColumnDrift) { + // need move measure to dimension and return new row by current schema byte[][] noDicts = dimObject.getNoDictionaryKeys(); byte[][] newNoDicts = new byte[noDictCount][]; - for(int i = 0; i < noDictCount; i++) { + for (int i = 0; i < noDictCount; i++) { if (isColumnDrift[i]) { - newNoDicts[i] = DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn( - rawRow[noDictMap[i]], measureDataTypes[i]); + newNoDicts[i] = DataTypeUtil + .getBytesDataDataTypeForNoDictionaryColumn(rawRow[noDictMap[i]], measureDataTypes[i]); } else { newNoDicts[i] = noDicts[noDictMap[i]]; } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala index b212a32f7ed..072971321ff 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala @@ -72,7 +72,6 @@ object Spark2TestQueryExecutor { .enableHiveSupport() .config("spark.sql.warehouse.dir", warehouse) .config("spark.sql.crossJoin.enabled", "true") - .config("spark.network.timeout", 10000000) .getOrCreateCarbonSession(null, TestQueryExecutor.metaStoreDB) if (warehouse.startsWith("hdfs://")) { System.setProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION, warehouse) From 79b697dc0a6dd9401acd01f148c9ca3f16a14619 Mon Sep 17 00:00:00 2001 From: QiangCai Date: Sun, 5 May 2019 19:40:15 +0800 Subject: [PATCH 3/3] fix ci issue --- .../core/scan/result/iterator/RawResultIterator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java index 6a4b741eafc..893fe25af4d 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java @@ -70,7 +70,7 @@ public class RawResultIterator extends CarbonIterator { // column drift private final boolean hasColumnDrift; private boolean[] isColumnDrift; - private final int measureCount; + private int measureCount; private DataType[] measureDataTypes; /** @@ -87,7 +87,6 @@ public RawResultIterator(CarbonIterator detailRawQueryResultIterator, this.destinationSegProperties = destinationSegProperties; this.executorService = Executors.newFixedThreadPool(1); this.hasColumnDrift = hasColumnDrift; - this.measureCount = destinationSegProperties.getMeasures().size(); if (!isStreamingHandoff) { init(); } @@ -103,6 +102,7 @@ private void initForColumnDrift() { } } } + measureCount = destinationSegProperties.getMeasures().size(); noDictCount = noDictDims.size(); isColumnDrift = new boolean[noDictCount]; noDictMap = new int[noDictCount];