From 771fbff026e51ebeb5aeac42b9f71363ef62747f Mon Sep 17 00:00:00 2001 From: QiangCai Date: Wed, 15 May 2019 16:46:20 +0800 Subject: [PATCH 1/2] enhance custom compaction to support resort single old segment by new sort_columns --- .../blockletindex/BlockletDataMapFactory.java | 2 +- .../blockletindex/SegmentIndexFileStore.java | 59 ++++++++++++ .../TableStatusReadCommittedScope.java | 2 +- .../spark/rdd/CarbonTableCompactor.scala | 20 +++- .../merger/CarbonCompactionUtil.java | 93 +++++++++++++------ 5 files changed, 145 insertions(+), 31 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index 446507fad75..cab1b8b31a7 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -167,7 +167,7 @@ public Map> getDataMaps(List segments return dataMaps; } - private Set getTableBlockIndexUniqueIdentifiers(Segment segment) + public Set getTableBlockIndexUniqueIdentifiers(Segment segment) throws IOException { Set tableBlockIndexUniqueIdentifiers = segmentMap.get(segment.getSegmentNo()); diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java index ae5be682c47..e75f79c616e 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -29,21 +30,27 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; import org.apache.carbondata.core.metadata.SegmentFileStore; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.readcommitter.ReadCommittedScope; import org.apache.carbondata.core.reader.CarbonIndexFileReader; import org.apache.carbondata.core.reader.ThriftReader; +import org.apache.carbondata.core.statusmanager.FileFormat; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.util.CarbonMetadataUtil; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataFileFooterConverter; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.format.BlockIndex; +import org.apache.carbondata.format.IndexHeader; import org.apache.carbondata.format.MergedBlockIndex; import org.apache.carbondata.format.MergedBlockIndexHeader; @@ -515,4 +522,56 @@ private List getBlockletInfoFromIndexInfo(TableBlockInfo blockInfo public Map> getCarbonMergeFileToIndexFilesMap() { return carbonMergeFileToIndexFilesMap; } + + public static IndexHeader getIndexHeaderOfSegment(LoadMetadataDetails load, + ReadCommittedScope readCommitScope, BlockletDataMapFactory dataMapFactory) { + IndexHeader indexHeader = null; + Segment segment = new Segment(load.getLoadName(), load.getSegmentFile(), readCommitScope); + Set indexIdents = new HashSet<>(); + if (FileFormat.COLUMNAR_V3 == load.getFileFormat()) { + try { + indexIdents = dataMapFactory.getTableBlockIndexUniqueIdentifiers(segment); + } catch (IOException ex) { + indexIdents = new HashSet<>(); + LOGGER.error(ex); + } + } + Iterator indexIterator = indexIdents.iterator(); + if (indexIterator.hasNext()) { + TableBlockIndexUniqueIdentifier indexIdent = indexIterator.next(); + byte[] indexContent = null; + String indexFilePath = + indexIdent.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + indexIdent + .getIndexFileName(); + if (indexIdent.getMergeIndexFileName() != null) { + SegmentIndexFileStore indexFileStore = + new SegmentIndexFileStore(readCommitScope.getConfiguration()); + try { + indexFileStore.readMergeFile(indexFilePath); + } catch (IOException ex) { + LOGGER.error(ex); + } + Iterator> iterator = + indexFileStore.getCarbonIndexMap().entrySet().iterator(); + if (iterator.hasNext()) { + indexContent = iterator.next().getValue(); + } + } + CarbonIndexFileReader indexReader = new CarbonIndexFileReader(); + try { + if (indexContent == null) { + indexReader.openThriftReader(indexFilePath); + } else { + indexReader.openThriftReader(indexContent); + } + // get the index header + indexHeader = indexReader.readIndexHeader(); + } catch (IOException ex) { + LOGGER.error(ex); + } finally { + indexReader.closeThriftReader(); + } + } return indexHeader; + } + } diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java index 5622efe0957..e4fd6f4f8e8 100644 --- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java +++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java @@ -55,7 +55,7 @@ public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier, } public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier, - LoadMetadataDetails[] loadMetadataDetails, Configuration configuration) throws IOException { + LoadMetadataDetails[] loadMetadataDetails, Configuration configuration) { this.identifier = identifier; this.configuration = configuration; this.loadMetadataDetails = loadMetadataDetails; diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index afe29276e34..360a4705e1f 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -29,13 +29,14 @@ import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCa import org.apache.spark.util.MergeIndexUtil import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.constants.SortScopeOptions.SortScope import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} import org.apache.carbondata.core.metadata.SegmentFileStore import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events._ import org.apache.carbondata.processing.loading.model.CarbonLoadModel -import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} +import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.spark.MergeResultImpl /** @@ -50,6 +51,21 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, operationContext: OperationContext) extends Compactor(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) { + private def needSortSingleSegment( + loadsToMerge: java.util.List[LoadMetadataDetails]): Boolean = { + // support to resort old segment with old sort_columns + if (CompactionType.CUSTOM == compactionModel.compactionType && + loadsToMerge.size() == 1 && + SortScope.NO_SORT != compactionModel.carbonTable.getSortScope) { + !CarbonCompactionUtil.isSortedByCurrentSortColumns( + carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, + loadsToMerge.get(0), + sqlContext.sparkSession.sessionState.newHadoopConf()) + } else { + false + } + } + override def executeCompaction(): Unit = { val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails]( carbonLoadModel.getLoadMetadataDetails @@ -58,7 +74,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, var loadsToMerge = identifySegmentsToBeMerged() - while (loadsToMerge.size() > 1 || + while (loadsToMerge.size() > 1 || needSortSingleSegment(loadsToMerge) || (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType && loadsToMerge.size() > 0)) { val lastSegment = sortedSegments.get(sortedSegments.size() - 1) diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java index c3017a79bce..025e53ddde0 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java @@ -26,12 +26,16 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.DataMapStoreManager; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.block.TaskBlockInfo; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.encoder.Encoding; @@ -40,6 +44,8 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.readcommitter.ReadCommittedScope; +import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope; import org.apache.carbondata.core.scan.executor.util.RestructureUtil; import org.apache.carbondata.core.scan.expression.ColumnExpression; import org.apache.carbondata.core.scan.expression.Expression; @@ -49,13 +55,16 @@ import org.apache.carbondata.core.scan.expression.conditional.LessThanEqualToExpression; import org.apache.carbondata.core.scan.expression.logical.AndExpression; import org.apache.carbondata.core.scan.expression.logical.OrExpression; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.format.IndexHeader; import org.apache.carbondata.hadoop.CarbonInputSplit; import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; /** @@ -612,6 +621,37 @@ public static int getTaskCountForSegment(CarbonInputSplit[] splits) { return taskIdSet.size(); } + private static boolean compareSortColumns(CarbonTable table, List fileColumns) { + // When sort_columns is modified, it will be consider as no_sort also. + List sortColumnsOfSegment = new ArrayList<>(); + for (ColumnSchema column : fileColumns) { + if (column.isDimensionColumn() && column.isSortColumn()) { + sortColumnsOfSegment.add(new CarbonDimension(column, -1, -1, -1)); + } + } + if (sortColumnsOfSegment.size() < table.getNumberOfSortColumns()) { + return false; + } + List sortColumnsOfTable = new ArrayList<>(); + for (CarbonDimension dimension : table.getDimensions()) { + if (dimension.isSortColumn()) { + sortColumnsOfTable.add(dimension); + } + } + int sortColumnNums = sortColumnsOfTable.size(); + if (sortColumnsOfSegment.size() < sortColumnNums) { + return false; + } + // compare sort_columns + for (int i = 0; i < sortColumnNums; i++) { + if (!RestructureUtil.isColumnMatches(table.isTransactionalTable(), sortColumnsOfTable.get(i), + sortColumnsOfSegment.get(i))) { + return false; + } + } + return true; + } + /** * Returns if the DataFileFooter containing carbondata file contains * sorted data or not. @@ -622,35 +662,34 @@ public static int getTaskCountForSegment(CarbonInputSplit[] splits) { */ public static boolean isSortedByCurrentSortColumns(CarbonTable table, DataFileFooter footer) { if (footer.isSorted()) { - // When sort_columns is modified, it will be consider as no_sort also. - List sortColumnsOfSegment = new ArrayList<>(); - for (ColumnSchema column : footer.getColumnInTable()) { - if (column.isDimensionColumn() && column.isSortColumn()) { - sortColumnsOfSegment.add(new CarbonDimension(column, -1, -1, -1)); - } - } - if (sortColumnsOfSegment.size() < table.getNumberOfSortColumns()) { - return false; - } - List sortColumnsOfTable = new ArrayList<>(); - for (CarbonDimension dimension : table.getDimensions()) { - if (dimension.isSortColumn()) { - sortColumnsOfTable.add(dimension); - } - } - int sortColumnNums = sortColumnsOfTable.size(); - if (sortColumnsOfSegment.size() < sortColumnNums) { - return false; - } - // compare sort_columns - for (int i = 0; i < sortColumnNums; i++) { - if (!RestructureUtil - .isColumnMatches(table.isTransactionalTable(), sortColumnsOfTable.get(i), - sortColumnsOfSegment.get(i))) { - return false; + return compareSortColumns(table, footer.getColumnInTable()); + } else { + return false; + } + } + + public static boolean isSortedByCurrentSortColumns( + CarbonTable table, LoadMetadataDetails load, Configuration hadoopConf) { + ReadCommittedScope readCommitScope = new TableStatusReadCommittedScope( + table.getAbsoluteTableIdentifier(), new LoadMetadataDetails[] { load }, hadoopConf); + BlockletDataMapFactory dataMapFactory = (BlockletDataMapFactory) DataMapStoreManager + .getInstance().getDefaultDataMap(table).getDataMapFactory(); + IndexHeader indexHeader = SegmentIndexFileStore + .getIndexHeaderOfSegment(load, readCommitScope, dataMapFactory); + if (indexHeader.is_sort) { + ThriftWrapperSchemaConverterImpl schemaConverter = new ThriftWrapperSchemaConverterImpl(); + List columns = new ArrayList<>(indexHeader.getTable_columns().size()); + for (org.apache.carbondata.format.ColumnSchema column : indexHeader.getTable_columns()) { + if (column.isDimension()) { + Map properties = column.getColumnProperties(); + if (properties != null) { + if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) { + columns.add(schemaConverter.fromExternalToWrapperColumnSchema(column)); + } + } } } - return true; + return compareSortColumns(table, columns); } else { return false; } From a289403ba9349fdd66682067bec55ce573feade1 Mon Sep 17 00:00:00 2001 From: QiangCai Date: Wed, 29 May 2019 20:14:35 +0800 Subject: [PATCH 2/2] fix comments --- .../blockletindex/SegmentIndexFileStore.java | 73 ++++++---------- .../merger/CarbonCompactionUtil.java | 87 ++++++++++++++----- 2 files changed, 90 insertions(+), 70 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java index e75f79c616e..4351f3aece9 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java @@ -30,20 +30,15 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; import org.apache.carbondata.core.metadata.SegmentFileStore; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; -import org.apache.carbondata.core.readcommitter.ReadCommittedScope; import org.apache.carbondata.core.reader.CarbonIndexFileReader; import org.apache.carbondata.core.reader.ThriftReader; -import org.apache.carbondata.core.statusmanager.FileFormat; -import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.util.CarbonMetadataUtil; import org.apache.carbondata.core.util.CarbonUtil; @@ -523,55 +518,37 @@ public Map> getCarbonMergeFileToIndexFilesMap() { return carbonMergeFileToIndexFilesMap; } - public static IndexHeader getIndexHeaderOfSegment(LoadMetadataDetails load, - ReadCommittedScope readCommitScope, BlockletDataMapFactory dataMapFactory) { - IndexHeader indexHeader = null; - Segment segment = new Segment(load.getLoadName(), load.getSegmentFile(), readCommitScope); - Set indexIdents = new HashSet<>(); - if (FileFormat.COLUMNAR_V3 == load.getFileFormat()) { + public static IndexHeader readIndexHeader(String indexFilePath, Configuration configuration) { + byte[] indexContent = null; + if (indexFilePath.toLowerCase().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(configuration); try { - indexIdents = dataMapFactory.getTableBlockIndexUniqueIdentifiers(segment); + indexFileStore.readMergeFile(indexFilePath); } catch (IOException ex) { - indexIdents = new HashSet<>(); LOGGER.error(ex); } - } - Iterator indexIterator = indexIdents.iterator(); - if (indexIterator.hasNext()) { - TableBlockIndexUniqueIdentifier indexIdent = indexIterator.next(); - byte[] indexContent = null; - String indexFilePath = - indexIdent.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + indexIdent - .getIndexFileName(); - if (indexIdent.getMergeIndexFileName() != null) { - SegmentIndexFileStore indexFileStore = - new SegmentIndexFileStore(readCommitScope.getConfiguration()); - try { - indexFileStore.readMergeFile(indexFilePath); - } catch (IOException ex) { - LOGGER.error(ex); - } - Iterator> iterator = - indexFileStore.getCarbonIndexMap().entrySet().iterator(); - if (iterator.hasNext()) { - indexContent = iterator.next().getValue(); - } + Iterator> iterator = + indexFileStore.getCarbonIndexMap().entrySet().iterator(); + if (iterator.hasNext()) { + indexContent = iterator.next().getValue(); } - CarbonIndexFileReader indexReader = new CarbonIndexFileReader(); - try { - if (indexContent == null) { - indexReader.openThriftReader(indexFilePath); - } else { - indexReader.openThriftReader(indexContent); - } - // get the index header - indexHeader = indexReader.readIndexHeader(); - } catch (IOException ex) { - LOGGER.error(ex); - } finally { - indexReader.closeThriftReader(); + } + CarbonIndexFileReader indexReader = new CarbonIndexFileReader(); + IndexHeader indexHeader = null; + try { + if (indexContent == null) { + indexReader.openThriftReader(indexFilePath); + } else { + indexReader.openThriftReader(indexContent); } - } return indexHeader; + // get the index header + indexHeader = indexReader.readIndexHeader(); + } catch (IOException ex) { + LOGGER.error(ex); + } finally { + indexReader.closeThriftReader(); + } + return indexHeader; } } diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java index 025e53ddde0..8cf477eb289 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java @@ -24,18 +24,17 @@ import java.util.Map; import java.util.Set; +import org.apache.carbondata.common.Strings; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datamap.DataMapStoreManager; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.block.TaskBlockInfo; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; -import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.encoder.Encoding; @@ -44,8 +43,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; -import org.apache.carbondata.core.readcommitter.ReadCommittedScope; -import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope; import org.apache.carbondata.core.scan.executor.util.RestructureUtil; import org.apache.carbondata.core.scan.expression.ColumnExpression; import org.apache.carbondata.core.scan.expression.Expression; @@ -670,28 +667,74 @@ public static boolean isSortedByCurrentSortColumns(CarbonTable table, DataFileFo public static boolean isSortedByCurrentSortColumns( CarbonTable table, LoadMetadataDetails load, Configuration hadoopConf) { - ReadCommittedScope readCommitScope = new TableStatusReadCommittedScope( - table.getAbsoluteTableIdentifier(), new LoadMetadataDetails[] { load }, hadoopConf); - BlockletDataMapFactory dataMapFactory = (BlockletDataMapFactory) DataMapStoreManager - .getInstance().getDefaultDataMap(table).getDataMapFactory(); - IndexHeader indexHeader = SegmentIndexFileStore - .getIndexHeaderOfSegment(load, readCommitScope, dataMapFactory); - if (indexHeader.is_sort) { - ThriftWrapperSchemaConverterImpl schemaConverter = new ThriftWrapperSchemaConverterImpl(); - List columns = new ArrayList<>(indexHeader.getTable_columns().size()); - for (org.apache.carbondata.format.ColumnSchema column : indexHeader.getTable_columns()) { - if (column.isDimension()) { - Map properties = column.getColumnProperties(); - if (properties != null) { - if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) { - columns.add(schemaConverter.fromExternalToWrapperColumnSchema(column)); + List sortColumnList = table.getSortColumns(); + if (sortColumnList.isEmpty()) { + return false; + } + // table sort_columns + String sortColumns = Strings.mkString( + sortColumnList.toArray(new String[sortColumnList.size()]), ","); + String segmentPath = + CarbonTablePath.getSegmentPath(table.getTablePath(), load.getLoadName()); + // segment sort_columns + String segmentSortColumns = getSortColumnsOfSegment(segmentPath); + if (segmentSortColumns == null) { + return false; + } else { + return segmentSortColumns.equalsIgnoreCase(sortColumns); + } + } + + private static String mkSortColumnsString( + List columnList) { + StringBuilder builder = new StringBuilder(); + for (org.apache.carbondata.format.ColumnSchema column : columnList) { + if (column.isDimension()) { + Map properties = column.getColumnProperties(); + if (properties != null) { + if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) { + builder.append(column.column_name).append(","); + } + } + } + } + if (builder.length() > 1) { + return builder.substring(0, builder.length() - 1); + } else { + return null; + } + } + + public static String getSortColumnsOfSegment(String segmentFolder) { + CarbonFile[] files = SegmentIndexFileStore.getCarbonIndexFiles( + segmentFolder, FileFactory.getConfiguration()); + Set isSortSet = new HashSet<>(); + Set sortColumnsSet = new HashSet<>(); + if (files != null) { + for (CarbonFile file : files) { + IndexHeader indexHeader = SegmentIndexFileStore.readIndexHeader( + file.getCanonicalPath(), FileFactory.getConfiguration()); + if (indexHeader != null) { + if (indexHeader.isSetIs_sort()) { + isSortSet.add(indexHeader.is_sort); + if (indexHeader.is_sort) { + sortColumnsSet.add(mkSortColumnsString(indexHeader.getTable_columns())); } + } else { + // if is_sort is not set, it will be old store and consider as local_sort by default. + sortColumnsSet.add(mkSortColumnsString(indexHeader.getTable_columns())); } } + if (isSortSet.size() >= 2 || sortColumnsSet.size() >= 2) { + break; + } } - return compareSortColumns(table, columns); + } + // for all index files, sort_columns should be same + if (isSortSet.size() <= 1 && sortColumnsSet.size() == 1) { + return sortColumnsSet.iterator().next(); } else { - return false; + return null; } }