Skip to content

Commit

Permalink
Merge bf51742 into 1d4d240
Browse files Browse the repository at this point in the history
  • Loading branch information
xuchuanyin committed Sep 25, 2018
2 parents 1d4d240 + bf51742 commit ccd68d2
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.carbondata.core.datamap.dev.DataMapModel;
import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
Expand Down Expand Up @@ -87,6 +89,7 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
private Path indexPath;
private Set<String> filteredShard;
private boolean needShardPrune;
private String indexVersion;
/**
* This is used to convert literal filter value to internal carbon value
*/
Expand All @@ -113,11 +116,13 @@ public void setFilteredShard(Set<String> filteredShard) {
/**
* init field converters for index columns
*/
public void initIndexColumnConverters(CarbonTable carbonTable, List<CarbonColumn> indexedColumn) {
public void initIndexColumnConverters(CarbonTable carbonTable, String dmStorePath,
List<CarbonColumn> indexedColumn) {
this.name2Col = new HashMap<>(indexedColumn.size());
for (CarbonColumn col : indexedColumn) {
this.name2Col.put(col.getColName(), col);
}
this.indexVersion = retrieveIndexFileVersion(dmStorePath);
String parentTablePath = getAncestorTablePath(carbonTable);

try {
Expand All @@ -140,7 +145,7 @@ public void initIndexColumnConverters(CarbonTable carbonTable, List<CarbonColumn
dataField.setTimestampFormat(tsFormat);
FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
.createFieldEncoder(dataField, absoluteTableIdentifier, i, nullFormat, null, false,
localCaches[i], false, parentTablePath, false);
localCaches[i], false, parentTablePath, false, indexVersion);
this.name2Converters.put(indexedColumn.get(i).getColName(), fieldConverter);
}
} catch (IOException e) {
Expand All @@ -151,6 +156,23 @@ public void initIndexColumnConverters(CarbonTable carbonTable, List<CarbonColumn
this.badRecordLogHolder.setLogged(false);
}

private String retrieveIndexFileVersion(String dmStorePath) {
CarbonFile dmStore = FileFactory.getCarbonFile(dmStorePath);
CarbonFile[] indexVersionFile = dmStore.listFiles(new CarbonFileFilter() {
@Override
public boolean accept(CarbonFile file) {
return file.getName().startsWith(BloomIndexFileStore.BLOOM_INDEX_VERSION_PREFIX);
}
});
if (null == indexVersionFile || indexVersionFile.length == 0) {
// we does not have this file, which means this is version is 1.4.1 afterwards
return "1.4.1";
} else {
return indexVersionFile[0].getName().substring(
BloomIndexFileStore.BLOOM_INDEX_VERSION_PREFIX.length());
}
}

/**
* recursively find the ancestor's table path. This is used for dictionary scenario
* where preagg will use the dictionary of the parent table.
Expand Down Expand Up @@ -344,9 +366,11 @@ private BloomQueryModel buildQueryModelInternal(CarbonColumn carbonColumn,
// for dictionary/date columns, convert the surrogate key to bytes
internalFilterValue = CarbonUtil.getValueAsBytes(DataTypes.INT, convertedValue);
} else {
// for non dictionary dimensions, numeric columns will be of original data,
// so convert the data to bytes
if (DataTypeUtil.isPrimitiveColumn(carbonColumn.getDataType())) {
// for non dictionary dimensions, before 1.5.0, numeric fields are stored as bytes just like
// string column; since 1.5.0, they will be encoded as original primitive objects, so here we
// need to convert them to bytes
if (indexVersion.compareTo("1.5.0") >= 0 &&
DataTypeUtil.isPrimitiveColumn(carbonColumn.getDataType())) {
if (convertedValue == null) {
convertedValue = DataConvertUtil.getNullValueForMeasure(carbonColumn.getDataType(),
carbonColumn.getColumnSchema().getScale());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,10 @@ public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException
// for merge shard, shard pruning delay to be done before pruning blocklet
BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
bloomDM.init(new BloomDataMapModel(shard, cache, segment.getConfiguration()));
bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns());
String dataMapStorePath = CarbonTablePath.getDataMapStorePath(
getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName);
bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapStorePath,
dataMapMeta.getIndexedColumns());
bloomDM.setFilteredShard(filteredShards);
dataMaps.add(bloomDM);
}
Expand All @@ -282,7 +285,10 @@ public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
Set<String> filteredShards = ((BloomDataMapDistributable) distributable).getFilteredShards();
BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
bloomDM.init(new BloomDataMapModel(indexPath, cache, FileFactory.getConfiguration()));
bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns());
String dmStorePath = CarbonTablePath.getDataMapStorePath(
getCarbonTable().getTablePath(), distributable.getSegment().getSegmentNo(), dataMapName);
bloomDM.initIndexColumnConverters(getCarbonTable(), dmStorePath,
dataMapMeta.getIndexedColumns());
bloomDM.setFilteredShard(filteredShards);
dataMaps.add(bloomDM);
return dataMaps;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public class BloomIndexFileStore {
public static final String MERGE_BLOOM_INDEX_SUFFIX = ".bloomindexmerge";
// directory to store merged bloom index files
public static final String MERGE_BLOOM_INDEX_SHARD_NAME = "mergeShard";
// prefix for the bloom index file name, the file name looks like 'version_1.5.0', means that the
// bloom index file is generated by carbon 1.5.0 afterwards. The corresponding file is empty,
// the file name indicates the version of file. This file is new since 1.5.0 for compatibility.
public static final String BLOOM_INDEX_VERSION_PREFIX = "version_";
/**
* flag file for merging
* if flag file exists, query won't use mergeShard
Expand Down Expand Up @@ -132,6 +136,16 @@ public boolean accept(CarbonFile file) {
}
}

/**
* write index version file under the segment folder.
* before 1.5.0, the file does not exist.
*/
public static void writeBloomIndexVersionFile(String dmSegmentPath) throws IOException {
String indexVersionFile = dmSegmentPath + File.separator + BLOOM_INDEX_VERSION_PREFIX + "1.5.0";
if (!FileFactory.isFileExist(indexVersionFile, FileFactory.getFileType(indexVersionFile))) {
FileFactory.createNewFile(indexVersionFile, FileFactory.getFileType(indexVersionFile));
}
}
/**
* load bloom filter from bloom index file
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ class CarbonMergeBloomIndexFilesRDD(
logInfo("Merging bloom index files of " +
s"segment ${split.segmentId} for ${carbonTable.getTableName}")

bloomDatamapNames.zipWithIndex.map( dm => {
bloomDatamapNames.zipWithIndex.foreach{ dm =>
val dmSegmentPath = CarbonTablePath.getDataMapStorePath(
tablePath, split.segmentId, dm._1)
BloomIndexFileStore.mergeBloomIndexFile(dmSegmentPath, bloomIndexColumns(dm._2).asJava)
})
BloomIndexFileStore.writeBloomIndexVersionFile(dmSegmentPath)
}

val iter = new Iterator[String] {
var havePair = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class MergeBloomIndexEventListener extends OperationEventListener with Logging {
new CarbonMergeBloomIndexFilesRDD(sparkSession, carbonTable,
segmentIds, bloomDMnames, bloomIndexColumns).collect()
}
case event: Event => throw new RuntimeException(s"Unsupported event $event")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.carbondata.processing.loading.DataField;
import org.apache.carbondata.processing.loading.converter.FieldConverter;

import org.apache.commons.lang3.StringUtils;

public class FieldEncoderFactory {

private static FieldEncoderFactory instance;
Expand Down Expand Up @@ -74,7 +76,7 @@ public static FieldEncoderFactory getInstance() {
public FieldConverter createFieldEncoder(DataField dataField,
AbsoluteTableIdentifier absoluteTableIdentifier, int index, String nullFormat,
DictionaryClient client, Boolean useOnePass, Map<Object, Integer> localCache,
boolean isEmptyBadRecord, String parentTablePath, boolean isConvertToBinary)
boolean isEmptyBadRecord, String parentTablePath, boolean isConvertToBinary, String version)
throws IOException {
// Converters are only needed for dimensions and measures it return null.
if (dataField.getColumn().isDimension()) {
Expand Down Expand Up @@ -124,7 +126,8 @@ public FieldConverter createFieldEncoder(DataField dataField,
// then treat it is as measure col
// so that the adaptive encoding can be applied on it easily
if (DataTypeUtil.isPrimitiveColumn(dataField.getColumn().getDataType())
&& !isConvertToBinary) {
&& !isConvertToBinary
&& (StringUtils.isEmpty(version) || version.compareTo("1.5.0") >= 0)) {
return new MeasureFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
}
return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void initialize() throws IOException {
FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
.createFieldEncoder(fields[i], configuration.getTableIdentifier(), i, nullFormat, client,
configuration.getUseOnePass(), localCaches[i], isEmptyBadRecord,
configuration.getParentTablePath(), isConvertToBinary);
configuration.getParentTablePath(), isConvertToBinary, null);
fieldConverterList.add(fieldConverter);
}
CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
Expand Down Expand Up @@ -226,7 +226,7 @@ public RowConverter createCopyForNewThread() {
fieldConverter = FieldEncoderFactory.getInstance()
.createFieldEncoder(fields[i], configuration.getTableIdentifier(), i, nullFormat,
client, configuration.getUseOnePass(), localCaches[i], isEmptyBadRecord,
configuration.getParentTablePath(), isConvertToBinary);
configuration.getParentTablePath(), isConvertToBinary, null);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down

0 comments on commit ccd68d2

Please sign in to comment.