Skip to content

Commit

Permalink
enhance custom compaction to support resort single old segment by new…
Browse files Browse the repository at this point in the history
… sort_columns
  • Loading branch information
QiangCai committed May 15, 2019
1 parent 24753a9 commit 8bb6f92
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 31 deletions.
Expand Up @@ -170,7 +170,7 @@ public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> segments
return dataMaps;
}

private Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment)
public Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment)
throws IOException {
Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
segmentMap.get(segment.getSegmentNo());
Expand Down
Expand Up @@ -22,28 +22,35 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.reader.CarbonIndexFileReader;
import org.apache.carbondata.core.reader.ThriftReader;
import org.apache.carbondata.core.statusmanager.FileFormat;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataFileFooterConverter;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.BlockIndex;
import org.apache.carbondata.format.IndexHeader;
import org.apache.carbondata.format.MergedBlockIndex;
import org.apache.carbondata.format.MergedBlockIndexHeader;

Expand Down Expand Up @@ -515,4 +522,56 @@ private List<BlockletInfo> getBlockletInfoFromIndexInfo(TableBlockInfo blockInfo
public Map<String, List<String>> getCarbonMergeFileToIndexFilesMap() {
return carbonMergeFileToIndexFilesMap;
}

public static IndexHeader getIndexHeaderOfSegment(LoadMetadataDetails load,
ReadCommittedScope readCommitScope, BlockletDataMapFactory dataMapFactory) {
IndexHeader indexHeader = null;
Segment segment = new Segment(load.getLoadName(), load.getSegmentFile(), readCommitScope);
Set<TableBlockIndexUniqueIdentifier> indexIdents = new HashSet<>();
if (FileFormat.COLUMNAR_V3 == load.getFileFormat()) {
try {
indexIdents = dataMapFactory.getTableBlockIndexUniqueIdentifiers(segment);
} catch (IOException ex) {
indexIdents = new HashSet<>();
LOGGER.error(ex);
}
}
Iterator<TableBlockIndexUniqueIdentifier> indexIterator = indexIdents.iterator();
if (indexIterator.hasNext()) {
TableBlockIndexUniqueIdentifier indexIdent = indexIterator.next();
byte[] indexContent = null;
String indexFilePath =
indexIdent.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + indexIdent
.getIndexFileName();
if (indexIdent.getMergeIndexFileName() != null) {
SegmentIndexFileStore indexFileStore =
new SegmentIndexFileStore(readCommitScope.getConfiguration());
try {
indexFileStore.readMergeFile(indexFilePath);
} catch (IOException ex) {
LOGGER.error(ex);
}
Iterator<Map.Entry<String, byte[]>> iterator =
indexFileStore.getCarbonIndexMap().entrySet().iterator();
if (iterator.hasNext()) {
indexContent = iterator.next().getValue();
}
}
CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
try {
if (indexContent == null) {
indexReader.openThriftReader(indexFilePath);
} else {
indexReader.openThriftReader(indexContent);
}
// get the index header
indexHeader = indexReader.readIndexHeader();
} catch (IOException ex) {
LOGGER.error(ex);
} finally {
indexReader.closeThriftReader();
}
} return indexHeader;
}

}
Expand Up @@ -55,7 +55,7 @@ public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
}

public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
LoadMetadataDetails[] loadMetadataDetails, Configuration configuration) throws IOException {
LoadMetadataDetails[] loadMetadataDetails, Configuration configuration) {
this.identifier = identifier;
this.configuration = configuration;
this.loadMetadataDetails = loadMetadataDetails;
Expand Down
Expand Up @@ -28,13 +28,14 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel}

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events._
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.spark.MergeResultImpl

/**
Expand All @@ -49,6 +50,21 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
operationContext: OperationContext)
extends Compactor(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) {

private def needSortSingleSegment(
loadsToMerge: java.util.List[LoadMetadataDetails]): Boolean = {
// support to resort old segment with old sort_columns
if (CompactionType.CUSTOM == compactionModel.compactionType &&
loadsToMerge.size() == 1 &&
SortScope.NO_SORT != compactionModel.carbonTable.getSortScope) {
!CarbonCompactionUtil.isSortedByCurrentSortColumns(
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
loadsToMerge.get(0),
sqlContext.sparkSession.sessionState.newHadoopConf())
} else {
false
}
}

override def executeCompaction(): Unit = {
val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails](
carbonLoadModel.getLoadMetadataDetails
Expand All @@ -57,7 +73,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,

var loadsToMerge = identifySegmentsToBeMerged()

while (loadsToMerge.size() > 1 ||
while (loadsToMerge.size() > 1 || needSortSingleSegment(loadsToMerge) ||
(CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType &&
loadsToMerge.size() > 0)) {
val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
Expand Down
Expand Up @@ -24,12 +24,16 @@

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
Expand All @@ -38,6 +42,8 @@
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
import org.apache.carbondata.core.scan.expression.ColumnExpression;
import org.apache.carbondata.core.scan.expression.Expression;
Expand All @@ -47,13 +53,16 @@
import org.apache.carbondata.core.scan.expression.conditional.LessThanEqualToExpression;
import org.apache.carbondata.core.scan.expression.logical.AndExpression;
import org.apache.carbondata.core.scan.expression.logical.OrExpression;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.IndexHeader;
import org.apache.carbondata.hadoop.CarbonInputSplit;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;

/**
Expand Down Expand Up @@ -601,6 +610,37 @@ public static Object[] getOverallMinMax(CarbonInputSplit[] carbonInputSplits,
return minMaxVals;
}

private static boolean compareSortColumns(CarbonTable table, List<ColumnSchema> fileColumns) {
// When sort_columns is modified, it will be consider as no_sort also.
List<CarbonDimension> sortColumnsOfSegment = new ArrayList<>();
for (ColumnSchema column : fileColumns) {
if (column.isDimensionColumn() && column.isSortColumn()) {
sortColumnsOfSegment.add(new CarbonDimension(column, -1, -1, -1));
}
}
if (sortColumnsOfSegment.size() < table.getNumberOfSortColumns()) {
return false;
}
List<CarbonDimension> sortColumnsOfTable = new ArrayList<>();
for (CarbonDimension dimension : table.getDimensions()) {
if (dimension.isSortColumn()) {
sortColumnsOfTable.add(dimension);
}
}
int sortColumnNums = sortColumnsOfTable.size();
if (sortColumnsOfSegment.size() < sortColumnNums) {
return false;
}
// compare sort_columns
for (int i = 0; i < sortColumnNums; i++) {
if (!RestructureUtil.isColumnMatches(table.isTransactionalTable(), sortColumnsOfTable.get(i),
sortColumnsOfSegment.get(i))) {
return false;
}
}
return true;
}

/**
* Returns if the DataFileFooter containing carbondata file contains
* sorted data or not.
Expand All @@ -611,35 +651,34 @@ public static Object[] getOverallMinMax(CarbonInputSplit[] carbonInputSplits,
*/
public static boolean isSortedByCurrentSortColumns(CarbonTable table, DataFileFooter footer) {
if (footer.isSorted()) {
// When sort_columns is modified, it will be consider as no_sort also.
List<CarbonDimension> sortColumnsOfSegment = new ArrayList<>();
for (ColumnSchema column : footer.getColumnInTable()) {
if (column.isDimensionColumn() && column.isSortColumn()) {
sortColumnsOfSegment.add(new CarbonDimension(column, -1, -1, -1));
}
}
if (sortColumnsOfSegment.size() < table.getNumberOfSortColumns()) {
return false;
}
List<CarbonDimension> sortColumnsOfTable = new ArrayList<>();
for (CarbonDimension dimension : table.getDimensions()) {
if (dimension.isSortColumn()) {
sortColumnsOfTable.add(dimension);
}
}
int sortColumnNums = sortColumnsOfTable.size();
if (sortColumnsOfSegment.size() < sortColumnNums) {
return false;
}
// compare sort_columns
for (int i = 0; i < sortColumnNums; i++) {
if (!RestructureUtil
.isColumnMatches(table.isTransactionalTable(), sortColumnsOfTable.get(i),
sortColumnsOfSegment.get(i))) {
return false;
return compareSortColumns(table, footer.getColumnInTable());
} else {
return false;
}
}

public static boolean isSortedByCurrentSortColumns(
CarbonTable table, LoadMetadataDetails load, Configuration hadoopConf) {
ReadCommittedScope readCommitScope = new TableStatusReadCommittedScope(
table.getAbsoluteTableIdentifier(), new LoadMetadataDetails[] { load }, hadoopConf);
BlockletDataMapFactory dataMapFactory = (BlockletDataMapFactory) DataMapStoreManager
.getInstance().getDefaultDataMap(table).getDataMapFactory();
IndexHeader indexHeader = SegmentIndexFileStore
.getIndexHeaderOfSegment(load, readCommitScope, dataMapFactory);
if (indexHeader.is_sort) {
ThriftWrapperSchemaConverterImpl schemaConverter = new ThriftWrapperSchemaConverterImpl();
List<ColumnSchema> columns = new ArrayList<>(indexHeader.getTable_columns().size());
for (org.apache.carbondata.format.ColumnSchema column : indexHeader.getTable_columns()) {
if (column.isDimension()) {
Map<String, String> properties = column.getColumnProperties();
if (properties != null) {
if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) {
columns.add(schemaConverter.fromExternalToWrapperColumnSchema(column));
}
}
}
}
return true;
return compareSortColumns(table, columns);
} else {
return false;
}
Expand Down

0 comments on commit 8bb6f92

Please sign in to comment.