Skip to content

Commit

Permalink
Merge a289403 into 1023ba9
Browse files Browse the repository at this point in the history
  • Loading branch information
QiangCai committed May 29, 2019
2 parents 1023ba9 + a289403 commit d0c02e1
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 29 deletions.
Expand Up @@ -167,7 +167,7 @@ public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> segments
return dataMaps;
}

private Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment)
public Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment)
throws IOException {
Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
segmentMap.get(segment.getSegmentNo());
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -44,6 +45,7 @@
import org.apache.carbondata.core.util.DataFileFooterConverter;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.BlockIndex;
import org.apache.carbondata.format.IndexHeader;
import org.apache.carbondata.format.MergedBlockIndex;
import org.apache.carbondata.format.MergedBlockIndexHeader;

Expand Down Expand Up @@ -515,4 +517,38 @@ private List<BlockletInfo> getBlockletInfoFromIndexInfo(TableBlockInfo blockInfo
public Map<String, List<String>> getCarbonMergeFileToIndexFilesMap() {
return carbonMergeFileToIndexFilesMap;
}

public static IndexHeader readIndexHeader(String indexFilePath, Configuration configuration) {
byte[] indexContent = null;
if (indexFilePath.toLowerCase().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(configuration);
try {
indexFileStore.readMergeFile(indexFilePath);
} catch (IOException ex) {
LOGGER.error(ex);
}
Iterator<Map.Entry<String, byte[]>> iterator =
indexFileStore.getCarbonIndexMap().entrySet().iterator();
if (iterator.hasNext()) {
indexContent = iterator.next().getValue();
}
}
CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
IndexHeader indexHeader = null;
try {
if (indexContent == null) {
indexReader.openThriftReader(indexFilePath);
} else {
indexReader.openThriftReader(indexContent);
}
// get the index header
indexHeader = indexReader.readIndexHeader();
} catch (IOException ex) {
LOGGER.error(ex);
} finally {
indexReader.closeThriftReader();
}
return indexHeader;
}

}
Expand Up @@ -55,7 +55,7 @@ public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
}

public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
LoadMetadataDetails[] loadMetadataDetails, Configuration configuration) throws IOException {
LoadMetadataDetails[] loadMetadataDetails, Configuration configuration) {
this.identifier = identifier;
this.configuration = configuration;
this.loadMetadataDetails = loadMetadataDetails;
Expand Down
Expand Up @@ -29,13 +29,14 @@ import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCa
import org.apache.spark.util.MergeIndexUtil

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events._
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.spark.MergeResultImpl

/**
Expand All @@ -50,6 +51,21 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
operationContext: OperationContext)
extends Compactor(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) {

private def needSortSingleSegment(
loadsToMerge: java.util.List[LoadMetadataDetails]): Boolean = {
// support to resort old segment with old sort_columns
if (CompactionType.CUSTOM == compactionModel.compactionType &&
loadsToMerge.size() == 1 &&
SortScope.NO_SORT != compactionModel.carbonTable.getSortScope) {
!CarbonCompactionUtil.isSortedByCurrentSortColumns(
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
loadsToMerge.get(0),
sqlContext.sparkSession.sessionState.newHadoopConf())
} else {
false
}
}

override def executeCompaction(): Unit = {
val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails](
carbonLoadModel.getLoadMetadataDetails
Expand All @@ -58,7 +74,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,

var loadsToMerge = identifySegmentsToBeMerged()

while (loadsToMerge.size() > 1 ||
while (loadsToMerge.size() > 1 || needSortSingleSegment(loadsToMerge) ||
(CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType &&
loadsToMerge.size() > 0)) {
val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
Expand Down
Expand Up @@ -24,11 +24,14 @@
import java.util.Map;
import java.util.Set;

import org.apache.carbondata.common.Strings;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
Expand All @@ -49,13 +52,16 @@
import org.apache.carbondata.core.scan.expression.conditional.LessThanEqualToExpression;
import org.apache.carbondata.core.scan.expression.logical.AndExpression;
import org.apache.carbondata.core.scan.expression.logical.OrExpression;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.IndexHeader;
import org.apache.carbondata.hadoop.CarbonInputSplit;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;

/**
Expand Down Expand Up @@ -612,6 +618,37 @@ public static int getTaskCountForSegment(CarbonInputSplit[] splits) {
return taskIdSet.size();
}

private static boolean compareSortColumns(CarbonTable table, List<ColumnSchema> fileColumns) {
// When sort_columns is modified, it will be consider as no_sort also.
List<CarbonDimension> sortColumnsOfSegment = new ArrayList<>();
for (ColumnSchema column : fileColumns) {
if (column.isDimensionColumn() && column.isSortColumn()) {
sortColumnsOfSegment.add(new CarbonDimension(column, -1, -1, -1));
}
}
if (sortColumnsOfSegment.size() < table.getNumberOfSortColumns()) {
return false;
}
List<CarbonDimension> sortColumnsOfTable = new ArrayList<>();
for (CarbonDimension dimension : table.getDimensions()) {
if (dimension.isSortColumn()) {
sortColumnsOfTable.add(dimension);
}
}
int sortColumnNums = sortColumnsOfTable.size();
if (sortColumnsOfSegment.size() < sortColumnNums) {
return false;
}
// compare sort_columns
for (int i = 0; i < sortColumnNums; i++) {
if (!RestructureUtil.isColumnMatches(table.isTransactionalTable(), sortColumnsOfTable.get(i),
sortColumnsOfSegment.get(i))) {
return false;
}
}
return true;
}

/**
* Returns if the DataFileFooter containing carbondata file contains
* sorted data or not.
Expand All @@ -622,37 +659,82 @@ public static int getTaskCountForSegment(CarbonInputSplit[] splits) {
*/
public static boolean isSortedByCurrentSortColumns(CarbonTable table, DataFileFooter footer) {
if (footer.isSorted()) {
// When sort_columns is modified, it will be consider as no_sort also.
List<CarbonDimension> sortColumnsOfSegment = new ArrayList<>();
for (ColumnSchema column : footer.getColumnInTable()) {
if (column.isDimensionColumn() && column.isSortColumn()) {
sortColumnsOfSegment.add(new CarbonDimension(column, -1, -1, -1));
return compareSortColumns(table, footer.getColumnInTable());
} else {
return false;
}
}

public static boolean isSortedByCurrentSortColumns(
CarbonTable table, LoadMetadataDetails load, Configuration hadoopConf) {
List<String> sortColumnList = table.getSortColumns();
if (sortColumnList.isEmpty()) {
return false;
}
// table sort_columns
String sortColumns = Strings.mkString(
sortColumnList.toArray(new String[sortColumnList.size()]), ",");
String segmentPath =
CarbonTablePath.getSegmentPath(table.getTablePath(), load.getLoadName());
// segment sort_columns
String segmentSortColumns = getSortColumnsOfSegment(segmentPath);
if (segmentSortColumns == null) {
return false;
} else {
return segmentSortColumns.equalsIgnoreCase(sortColumns);
}
}

private static String mkSortColumnsString(
List<org.apache.carbondata.format.ColumnSchema> columnList) {
StringBuilder builder = new StringBuilder();
for (org.apache.carbondata.format.ColumnSchema column : columnList) {
if (column.isDimension()) {
Map<String, String> properties = column.getColumnProperties();
if (properties != null) {
if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) {
builder.append(column.column_name).append(",");
}
}
}
if (sortColumnsOfSegment.size() < table.getNumberOfSortColumns()) {
return false;
}
List<CarbonDimension> sortColumnsOfTable = new ArrayList<>();
for (CarbonDimension dimension : table.getDimensions()) {
if (dimension.isSortColumn()) {
sortColumnsOfTable.add(dimension);
}
if (builder.length() > 1) {
return builder.substring(0, builder.length() - 1);
} else {
return null;
}
}

public static String getSortColumnsOfSegment(String segmentFolder) {
CarbonFile[] files = SegmentIndexFileStore.getCarbonIndexFiles(
segmentFolder, FileFactory.getConfiguration());
Set<Boolean> isSortSet = new HashSet<>();
Set<String> sortColumnsSet = new HashSet<>();
if (files != null) {
for (CarbonFile file : files) {
IndexHeader indexHeader = SegmentIndexFileStore.readIndexHeader(
file.getCanonicalPath(), FileFactory.getConfiguration());
if (indexHeader != null) {
if (indexHeader.isSetIs_sort()) {
isSortSet.add(indexHeader.is_sort);
if (indexHeader.is_sort) {
sortColumnsSet.add(mkSortColumnsString(indexHeader.getTable_columns()));
}
} else {
// if is_sort is not set, it will be old store and consider as local_sort by default.
sortColumnsSet.add(mkSortColumnsString(indexHeader.getTable_columns()));
}
}
}
int sortColumnNums = sortColumnsOfTable.size();
if (sortColumnsOfSegment.size() < sortColumnNums) {
return false;
}
// compare sort_columns
for (int i = 0; i < sortColumnNums; i++) {
if (!RestructureUtil
.isColumnMatches(table.isTransactionalTable(), sortColumnsOfTable.get(i),
sortColumnsOfSegment.get(i))) {
return false;
if (isSortSet.size() >= 2 || sortColumnsSet.size() >= 2) {
break;
}
}
return true;
}
// for all index files, sort_columns should be same
if (isSortSet.size() <= 1 && sortColumnsSet.size() == 1) {
return sortColumnsSet.iterator().next();
} else {
return false;
return null;
}
}

Expand Down

0 comments on commit d0c02e1

Please sign in to comment.