diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index 2ef7b8809bb..40a8cfc0e1e 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -170,7 +170,7 @@ public Map> getDataMaps(List segments return dataMaps; } - private Set getTableBlockIndexUniqueIdentifiers(Segment segment) + public Set getTableBlockIndexUniqueIdentifiers(Segment segment) throws IOException { Set tableBlockIndexUniqueIdentifiers = segmentMap.get(segment.getSegmentNo()); diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java index ae5be682c47..e75f79c616e 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -29,21 +30,27 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; import org.apache.carbondata.core.metadata.SegmentFileStore; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.readcommitter.ReadCommittedScope; import org.apache.carbondata.core.reader.CarbonIndexFileReader; import org.apache.carbondata.core.reader.ThriftReader; +import org.apache.carbondata.core.statusmanager.FileFormat; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.util.CarbonMetadataUtil; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataFileFooterConverter; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.format.BlockIndex; +import org.apache.carbondata.format.IndexHeader; import org.apache.carbondata.format.MergedBlockIndex; import org.apache.carbondata.format.MergedBlockIndexHeader; @@ -515,4 +522,56 @@ private List getBlockletInfoFromIndexInfo(TableBlockInfo blockInfo public Map> getCarbonMergeFileToIndexFilesMap() { return carbonMergeFileToIndexFilesMap; } + + public static IndexHeader getIndexHeaderOfSegment(LoadMetadataDetails load, + ReadCommittedScope readCommitScope, BlockletDataMapFactory dataMapFactory) { + IndexHeader indexHeader = null; + Segment segment = new Segment(load.getLoadName(), load.getSegmentFile(), readCommitScope); + Set indexIdents = new HashSet<>(); + if (FileFormat.COLUMNAR_V3 == load.getFileFormat()) { + try { + indexIdents = dataMapFactory.getTableBlockIndexUniqueIdentifiers(segment); + } catch (IOException ex) { + indexIdents = new HashSet<>(); + LOGGER.error(ex); + } + } + Iterator indexIterator = indexIdents.iterator(); + if (indexIterator.hasNext()) { + TableBlockIndexUniqueIdentifier indexIdent = indexIterator.next(); + byte[] indexContent = null; + String indexFilePath = + indexIdent.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + indexIdent + .getIndexFileName(); + if (indexIdent.getMergeIndexFileName() != null) { + SegmentIndexFileStore indexFileStore = + new SegmentIndexFileStore(readCommitScope.getConfiguration()); + try { + indexFileStore.readMergeFile(indexFilePath); + } catch (IOException ex) { + LOGGER.error(ex); + } + Iterator> iterator = + indexFileStore.getCarbonIndexMap().entrySet().iterator(); + if (iterator.hasNext()) { + indexContent = iterator.next().getValue(); + } + } + CarbonIndexFileReader indexReader = new CarbonIndexFileReader(); + try { + if (indexContent == null) { + indexReader.openThriftReader(indexFilePath); + } else { + indexReader.openThriftReader(indexContent); + } + // get the index header + indexHeader = indexReader.readIndexHeader(); + } catch (IOException ex) { + LOGGER.error(ex); + } finally { + indexReader.closeThriftReader(); + } + } return indexHeader; + } + } diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java index ac0d156754d..bc656301760 100644 --- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java +++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java @@ -55,7 +55,7 @@ public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier, } public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier, - LoadMetadataDetails[] loadMetadataDetails, Configuration configuration) throws IOException { + LoadMetadataDetails[] loadMetadataDetails, Configuration configuration) { this.identifier = identifier; this.configuration = configuration; this.loadMetadataDetails = loadMetadataDetails; diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index cfb40ecaeff..b9e6bda87aa 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -28,13 +28,14 @@ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel} import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.constants.SortScopeOptions.SortScope import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} import org.apache.carbondata.core.metadata.SegmentFileStore import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events._ import org.apache.carbondata.processing.loading.model.CarbonLoadModel -import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} +import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.spark.MergeResultImpl /** @@ -49,6 +50,21 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, operationContext: OperationContext) extends Compactor(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) { + private def needSortSingleSegment( + loadsToMerge: java.util.List[LoadMetadataDetails]): Boolean = { + // support to resort old segment with old sort_columns + if (CompactionType.CUSTOM == compactionModel.compactionType && + loadsToMerge.size() == 1 && + SortScope.NO_SORT != compactionModel.carbonTable.getSortScope) { + !CarbonCompactionUtil.isSortedByCurrentSortColumns( + carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, + loadsToMerge.get(0), + sqlContext.sparkSession.sessionState.newHadoopConf()) + } else { + false + } + } + override def executeCompaction(): Unit = { val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails]( carbonLoadModel.getLoadMetadataDetails @@ -57,7 +73,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, var loadsToMerge = identifySegmentsToBeMerged() - while (loadsToMerge.size() > 1 || + while (loadsToMerge.size() > 1 || needSortSingleSegment(loadsToMerge) || (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType && loadsToMerge.size() > 0)) { val lastSegment = sortedSegments.get(sortedSegments.size() - 1) diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java index f4a15bb5df7..39a21aa7854 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java @@ -24,12 +24,16 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.DataMapStoreManager; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.block.TaskBlockInfo; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.encoder.Encoding; @@ -38,6 +42,8 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.readcommitter.ReadCommittedScope; +import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope; import org.apache.carbondata.core.scan.executor.util.RestructureUtil; import org.apache.carbondata.core.scan.expression.ColumnExpression; import org.apache.carbondata.core.scan.expression.Expression; @@ -47,13 +53,16 @@ import org.apache.carbondata.core.scan.expression.conditional.LessThanEqualToExpression; import org.apache.carbondata.core.scan.expression.logical.AndExpression; import org.apache.carbondata.core.scan.expression.logical.OrExpression; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.format.IndexHeader; import org.apache.carbondata.hadoop.CarbonInputSplit; import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; /** @@ -601,6 +610,37 @@ public static Object[] getOverallMinMax(CarbonInputSplit[] carbonInputSplits, return minMaxVals; } + private static boolean compareSortColumns(CarbonTable table, List fileColumns) { + // When sort_columns is modified, it will be consider as no_sort also. + List sortColumnsOfSegment = new ArrayList<>(); + for (ColumnSchema column : fileColumns) { + if (column.isDimensionColumn() && column.isSortColumn()) { + sortColumnsOfSegment.add(new CarbonDimension(column, -1, -1, -1)); + } + } + if (sortColumnsOfSegment.size() < table.getNumberOfSortColumns()) { + return false; + } + List sortColumnsOfTable = new ArrayList<>(); + for (CarbonDimension dimension : table.getDimensions()) { + if (dimension.isSortColumn()) { + sortColumnsOfTable.add(dimension); + } + } + int sortColumnNums = sortColumnsOfTable.size(); + if (sortColumnsOfSegment.size() < sortColumnNums) { + return false; + } + // compare sort_columns + for (int i = 0; i < sortColumnNums; i++) { + if (!RestructureUtil.isColumnMatches(table.isTransactionalTable(), sortColumnsOfTable.get(i), + sortColumnsOfSegment.get(i))) { + return false; + } + } + return true; + } + /** * Returns if the DataFileFooter containing carbondata file contains * sorted data or not. @@ -611,35 +651,34 @@ public static Object[] getOverallMinMax(CarbonInputSplit[] carbonInputSplits, */ public static boolean isSortedByCurrentSortColumns(CarbonTable table, DataFileFooter footer) { if (footer.isSorted()) { - // When sort_columns is modified, it will be consider as no_sort also. - List sortColumnsOfSegment = new ArrayList<>(); - for (ColumnSchema column : footer.getColumnInTable()) { - if (column.isDimensionColumn() && column.isSortColumn()) { - sortColumnsOfSegment.add(new CarbonDimension(column, -1, -1, -1)); - } - } - if (sortColumnsOfSegment.size() < table.getNumberOfSortColumns()) { - return false; - } - List sortColumnsOfTable = new ArrayList<>(); - for (CarbonDimension dimension : table.getDimensions()) { - if (dimension.isSortColumn()) { - sortColumnsOfTable.add(dimension); - } - } - int sortColumnNums = sortColumnsOfTable.size(); - if (sortColumnsOfSegment.size() < sortColumnNums) { - return false; - } - // compare sort_columns - for (int i = 0; i < sortColumnNums; i++) { - if (!RestructureUtil - .isColumnMatches(table.isTransactionalTable(), sortColumnsOfTable.get(i), - sortColumnsOfSegment.get(i))) { - return false; + return compareSortColumns(table, footer.getColumnInTable()); + } else { + return false; + } + } + + public static boolean isSortedByCurrentSortColumns( + CarbonTable table, LoadMetadataDetails load, Configuration hadoopConf) { + ReadCommittedScope readCommitScope = new TableStatusReadCommittedScope( + table.getAbsoluteTableIdentifier(), new LoadMetadataDetails[] { load }, hadoopConf); + BlockletDataMapFactory dataMapFactory = (BlockletDataMapFactory) DataMapStoreManager + .getInstance().getDefaultDataMap(table).getDataMapFactory(); + IndexHeader indexHeader = SegmentIndexFileStore + .getIndexHeaderOfSegment(load, readCommitScope, dataMapFactory); + if (indexHeader.is_sort) { + ThriftWrapperSchemaConverterImpl schemaConverter = new ThriftWrapperSchemaConverterImpl(); + List columns = new ArrayList<>(indexHeader.getTable_columns().size()); + for (org.apache.carbondata.format.ColumnSchema column : indexHeader.getTable_columns()) { + if (column.isDimension()) { + Map properties = column.getColumnProperties(); + if (properties != null) { + if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) { + columns.add(schemaConverter.fromExternalToWrapperColumnSchema(column)); + } + } } } - return true; + return compareSortColumns(table, columns); } else { return false; }