Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
QiangCai committed May 29, 2019
1 parent 771fbff commit a289403
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 70 deletions.
Expand Up @@ -30,20 +30,15 @@

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.reader.CarbonIndexFileReader;
import org.apache.carbondata.core.reader.ThriftReader;
import org.apache.carbondata.core.statusmanager.FileFormat;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
Expand Down Expand Up @@ -523,55 +518,37 @@ public Map<String, List<String>> getCarbonMergeFileToIndexFilesMap() {
return carbonMergeFileToIndexFilesMap;
}

public static IndexHeader getIndexHeaderOfSegment(LoadMetadataDetails load,
ReadCommittedScope readCommitScope, BlockletDataMapFactory dataMapFactory) {
IndexHeader indexHeader = null;
Segment segment = new Segment(load.getLoadName(), load.getSegmentFile(), readCommitScope);
Set<TableBlockIndexUniqueIdentifier> indexIdents = new HashSet<>();
if (FileFormat.COLUMNAR_V3 == load.getFileFormat()) {
public static IndexHeader readIndexHeader(String indexFilePath, Configuration configuration) {
byte[] indexContent = null;
if (indexFilePath.toLowerCase().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(configuration);
try {
indexIdents = dataMapFactory.getTableBlockIndexUniqueIdentifiers(segment);
indexFileStore.readMergeFile(indexFilePath);
} catch (IOException ex) {
indexIdents = new HashSet<>();
LOGGER.error(ex);
}
}
Iterator<TableBlockIndexUniqueIdentifier> indexIterator = indexIdents.iterator();
if (indexIterator.hasNext()) {
TableBlockIndexUniqueIdentifier indexIdent = indexIterator.next();
byte[] indexContent = null;
String indexFilePath =
indexIdent.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + indexIdent
.getIndexFileName();
if (indexIdent.getMergeIndexFileName() != null) {
SegmentIndexFileStore indexFileStore =
new SegmentIndexFileStore(readCommitScope.getConfiguration());
try {
indexFileStore.readMergeFile(indexFilePath);
} catch (IOException ex) {
LOGGER.error(ex);
}
Iterator<Map.Entry<String, byte[]>> iterator =
indexFileStore.getCarbonIndexMap().entrySet().iterator();
if (iterator.hasNext()) {
indexContent = iterator.next().getValue();
}
Iterator<Map.Entry<String, byte[]>> iterator =
indexFileStore.getCarbonIndexMap().entrySet().iterator();
if (iterator.hasNext()) {
indexContent = iterator.next().getValue();
}
CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
try {
if (indexContent == null) {
indexReader.openThriftReader(indexFilePath);
} else {
indexReader.openThriftReader(indexContent);
}
// get the index header
indexHeader = indexReader.readIndexHeader();
} catch (IOException ex) {
LOGGER.error(ex);
} finally {
indexReader.closeThriftReader();
}
CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
IndexHeader indexHeader = null;
try {
if (indexContent == null) {
indexReader.openThriftReader(indexFilePath);
} else {
indexReader.openThriftReader(indexContent);
}
} return indexHeader;
// get the index header
indexHeader = indexReader.readIndexHeader();
} catch (IOException ex) {
LOGGER.error(ex);
} finally {
indexReader.closeThriftReader();
}
return indexHeader;
}

}
Expand Up @@ -24,18 +24,17 @@
import java.util.Map;
import java.util.Set;

import org.apache.carbondata.common.Strings;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
Expand All @@ -44,8 +43,6 @@
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
import org.apache.carbondata.core.scan.expression.ColumnExpression;
import org.apache.carbondata.core.scan.expression.Expression;
Expand Down Expand Up @@ -670,28 +667,74 @@ public static boolean isSortedByCurrentSortColumns(CarbonTable table, DataFileFo

public static boolean isSortedByCurrentSortColumns(
CarbonTable table, LoadMetadataDetails load, Configuration hadoopConf) {
ReadCommittedScope readCommitScope = new TableStatusReadCommittedScope(
table.getAbsoluteTableIdentifier(), new LoadMetadataDetails[] { load }, hadoopConf);
BlockletDataMapFactory dataMapFactory = (BlockletDataMapFactory) DataMapStoreManager
.getInstance().getDefaultDataMap(table).getDataMapFactory();
IndexHeader indexHeader = SegmentIndexFileStore
.getIndexHeaderOfSegment(load, readCommitScope, dataMapFactory);
if (indexHeader.is_sort) {
ThriftWrapperSchemaConverterImpl schemaConverter = new ThriftWrapperSchemaConverterImpl();
List<ColumnSchema> columns = new ArrayList<>(indexHeader.getTable_columns().size());
for (org.apache.carbondata.format.ColumnSchema column : indexHeader.getTable_columns()) {
if (column.isDimension()) {
Map<String, String> properties = column.getColumnProperties();
if (properties != null) {
if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) {
columns.add(schemaConverter.fromExternalToWrapperColumnSchema(column));
List<String> sortColumnList = table.getSortColumns();
if (sortColumnList.isEmpty()) {
return false;
}
// table sort_columns
String sortColumns = Strings.mkString(
sortColumnList.toArray(new String[sortColumnList.size()]), ",");
String segmentPath =
CarbonTablePath.getSegmentPath(table.getTablePath(), load.getLoadName());
// segment sort_columns
String segmentSortColumns = getSortColumnsOfSegment(segmentPath);
if (segmentSortColumns == null) {
return false;
} else {
return segmentSortColumns.equalsIgnoreCase(sortColumns);
}
}

private static String mkSortColumnsString(
List<org.apache.carbondata.format.ColumnSchema> columnList) {
StringBuilder builder = new StringBuilder();
for (org.apache.carbondata.format.ColumnSchema column : columnList) {
if (column.isDimension()) {
Map<String, String> properties = column.getColumnProperties();
if (properties != null) {
if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) {
builder.append(column.column_name).append(",");
}
}
}
}
if (builder.length() > 1) {
return builder.substring(0, builder.length() - 1);
} else {
return null;
}
}

public static String getSortColumnsOfSegment(String segmentFolder) {
CarbonFile[] files = SegmentIndexFileStore.getCarbonIndexFiles(
segmentFolder, FileFactory.getConfiguration());
Set<Boolean> isSortSet = new HashSet<>();
Set<String> sortColumnsSet = new HashSet<>();
if (files != null) {
for (CarbonFile file : files) {
IndexHeader indexHeader = SegmentIndexFileStore.readIndexHeader(
file.getCanonicalPath(), FileFactory.getConfiguration());
if (indexHeader != null) {
if (indexHeader.isSetIs_sort()) {
isSortSet.add(indexHeader.is_sort);
if (indexHeader.is_sort) {
sortColumnsSet.add(mkSortColumnsString(indexHeader.getTable_columns()));
}
} else {
// if is_sort is not set, it will be old store and consider as local_sort by default.
sortColumnsSet.add(mkSortColumnsString(indexHeader.getTable_columns()));
}
}
if (isSortSet.size() >= 2 || sortColumnsSet.size() >= 2) {
break;
}
}
return compareSortColumns(table, columns);
}
// for all index files, sort_columns should be same
if (isSortSet.size() <= 1 && sortColumnsSet.size() == 1) {
return sortColumnsSet.iterator().next();
} else {
return false;
return null;
}
}

Expand Down

0 comments on commit a289403

Please sign in to comment.