Skip to content

Commit

Permalink
add file-level min/max index for streaming segement
Browse files Browse the repository at this point in the history
  • Loading branch information
QiangCai committed Aug 17, 2018
1 parent 9ee0f35 commit c842223
Show file tree
Hide file tree
Showing 21 changed files with 750 additions and 129 deletions.
Expand Up @@ -334,6 +334,10 @@ public TableDataMap getDataMap(CarbonTable table, DataMapSchema dataMapSchema) {
return dataMap;
}

public StreamDataMap getStreamDataMap(CarbonTable table) {
return new StreamDataMap(table);
}

/**
* Return a new datamap instance and registered in the store manager.
* The datamap is created using datamap name, datamap factory class and table identifier.
Expand Down
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.core.datamap;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.Map;

import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.reader.CarbonIndexFileReader;
import org.apache.carbondata.core.scan.filter.FilterUtil;
import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.BlockIndex;

public class StreamDataMap {

private CarbonTable carbonTable;

private AbsoluteTableIdentifier identifier;

private FilterExecuter filterExecuter;

public StreamDataMap(CarbonTable carbonTable) {
this.carbonTable = carbonTable;
this.identifier = carbonTable.getAbsoluteTableIdentifier();
}

public void init(FilterResolverIntf filterExp) {
if (filterExp != null) {

List<CarbonColumn> minMaxCacheColumns = new ArrayList<>();
for (CarbonDimension dimension : carbonTable.getDimensions()) {
if (!dimension.isComplex()) {
minMaxCacheColumns.add(dimension);
}
}
minMaxCacheColumns.addAll(carbonTable.getMeasures());

List<ColumnSchema> listOfColumns =
carbonTable.getTableInfo().getFactTable().getListOfColumns();
int[] columnCardinality = new int[listOfColumns.size()];
for (int index = 0; index < columnCardinality.length; index++) {
columnCardinality[index] = Integer.MAX_VALUE;
}

SegmentProperties segmentProperties =
new SegmentProperties(listOfColumns, columnCardinality);

filterExecuter = FilterUtil.getFilterExecuterTree(
filterExp, segmentProperties, null, minMaxCacheColumns);
}
}

public List<StreamFile> prune(List<Segment> segments) throws IOException {
if (filterExecuter == null) {
return listAllStreamFiles(segments, false);
} else {
List<StreamFile> streamFileList = new ArrayList<>();
for (StreamFile streamFile : listAllStreamFiles(segments, true)) {
if (hitStreamFile(streamFile)) {
streamFileList.add(streamFile);
streamFile.setMinMaxIndex(null);
}
}
return streamFileList;
}
}

private boolean hitStreamFile(StreamFile streamFile) {
// backward compatibility, old stream file without min/max index
if (streamFile.getMinMaxIndex() == null) {
return true;
}

byte[][] maxValue = streamFile.getMinMaxIndex().getMaxValues();
byte[][] minValue = streamFile.getMinMaxIndex().getMinValues();
BitSet bitSet;
if (filterExecuter instanceof ImplicitColumnFilterExecutor) {
String filePath = streamFile.getFilePath();
String uniqueBlockPath = filePath.substring(filePath.lastIndexOf("/Part") + 1);
bitSet = ((ImplicitColumnFilterExecutor) filterExecuter)
.isFilterValuesPresentInBlockOrBlocklet(maxValue, minValue, uniqueBlockPath);
} else {
bitSet = filterExecuter.isScanRequired(maxValue, minValue);
}
if (!bitSet.isEmpty()) {
return true;
} else {
return false;
}
}

// TODO optimize and move the code to StreamSegment , but it's in the streaming module.
private List<StreamFile> listAllStreamFiles(List<Segment> segments, boolean withMinMax)
throws IOException {
List<StreamFile> streamFileList = new ArrayList<>();
for (Segment segment : segments) {
String segmentDir =
CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
if (FileFactory.isFileExist(segmentDir, fileType)) {
SegmentIndexFileStore segmentIndexFileStore = new SegmentIndexFileStore();
segmentIndexFileStore.readAllIIndexOfSegment(segmentDir);
Map<String, byte[]> carbonIndexMap = segmentIndexFileStore.getCarbonIndexMap();
CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
for (byte[] fileData : carbonIndexMap.values()) {
indexReader.openThriftReader(fileData);
try {
while (indexReader.hasNext()) {
BlockIndex blockIndex = indexReader.readBlockIndexInfo();
String filePath = segmentDir + File.separator + blockIndex.getFile_name();
long length = blockIndex.getFile_size();
StreamFile streamFile = new StreamFile(segment.getSegmentNo(), filePath, length);
streamFileList.add(streamFile);
if (withMinMax) {
if (blockIndex.getBlock_index() != null
&& blockIndex.getBlock_index().getMin_max_index() != null) {
streamFile.setMinMaxIndex(CarbonMetadataUtil.convertExternalMinMaxIndex(
blockIndex.getBlock_index().getMin_max_index()));
}
}
}
} finally {
indexReader.closeThriftReader();
}
}
}
}
return streamFileList;
}
}
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.core.datamap;

import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;

public class StreamFile {

private String segmentNo;

private String filePath;

private long fileSize;

private BlockletMinMaxIndex minMaxIndex;

public StreamFile(String segmentNo, String filePath, long fileSize) {
this.segmentNo = segmentNo;
this.filePath = filePath;
this.fileSize = fileSize;
}

public String getSegmentNo() {
return segmentNo;
}

public void setSegmentNo(String segmentNo) {
this.segmentNo = segmentNo;
}

public String getFilePath() {
return filePath;
}

public void setFilePath(String filePath) {
this.filePath = filePath;
}

public long getFileSize() {
return fileSize;
}

public void setFileSize(long fileSize) {
this.fileSize = fileSize;
}

public BlockletMinMaxIndex getMinMaxIndex() {
return minMaxIndex;
}

public void setMinMaxIndex(BlockletMinMaxIndex minMaxIndex) {
this.minMaxIndex = minMaxIndex;
}
}
Expand Up @@ -82,9 +82,9 @@ public int fillVector(int[] filteredRowId, ColumnVectorInfo[] vectorInfo, int ch
double doubleData = columnPage.getDouble(rowId);
if (srcDataType == DataTypes.FLOAT) {
float out = (float) doubleData;
return ByteUtil.toBytes(out);
return ByteUtil.toXorBytes(out);
} else {
return ByteUtil.toBytes(doubleData);
return ByteUtil.toXorBytes(doubleData);
}
} else if (DataTypes.isDecimal(srcDataType)) {
throw new RuntimeException("unsupported type: " + srcDataType);
Expand All @@ -95,22 +95,22 @@ public int fillVector(int[] filteredRowId, ColumnVectorInfo[] vectorInfo, int ch
long longData = columnPage.getLong(rowId);
if ((srcDataType == DataTypes.BYTE)) {
byte out = (byte) longData;
return ByteUtil.toBytes(out);
return ByteUtil.toXorBytes(out);
} else if (srcDataType == DataTypes.BOOLEAN) {
byte out = (byte) longData;
return ByteUtil.toBytes(ByteUtil.toBoolean(out));
} else if (srcDataType == DataTypes.SHORT) {
short out = (short) longData;
return ByteUtil.toBytes(out);
return ByteUtil.toXorBytes(out);
} else if (srcDataType == DataTypes.SHORT_INT) {
int out = (int) longData;
return ByteUtil.toBytes(out);
return ByteUtil.toXorBytes(out);
} else if (srcDataType == DataTypes.INT) {
int out = (int) longData;
return ByteUtil.toBytes(out);
return ByteUtil.toXorBytes(out);
} else {
// timestamp and long
return ByteUtil.toBytes(longData);
return ByteUtil.toXorBytes(longData);
}
} else if ((targetDataType == DataTypes.STRING) || (targetDataType == DataTypes.VARCHAR) || (
targetDataType == DataTypes.BYTE_ARRAY)) {
Expand All @@ -126,7 +126,7 @@ public int fillVector(int[] filteredRowId, ColumnVectorInfo[] vectorInfo, int ch
} else if (srcDataType == DataTypes.BYTE_ARRAY) {
return columnPage.getBytes(rowId);
} else if (srcDataType == DataTypes.DOUBLE) {
return ByteUtil.toBytes(columnPage.getDouble(rowId));
return ByteUtil.toXorBytes(columnPage.getDouble(rowId));
} else {
throw new RuntimeException("unsupported type: " + targetDataType);
}
Expand Down
Expand Up @@ -154,15 +154,15 @@ public void fillRow(int rowId, CarbonColumnVector vector, int vectorRow) {
} else if (dt == DataTypes.BOOLEAN) {
vector.putBoolean(vectorRow, ByteUtil.toBoolean(data[currentDataOffset]));
} else if (dt == DataTypes.SHORT) {
vector.putShort(vectorRow, ByteUtil.toShort(data, currentDataOffset, length));
vector.putShort(vectorRow, ByteUtil.toXorShort(data, currentDataOffset, length));
} else if (dt == DataTypes.INT) {
vector.putInt(vectorRow, ByteUtil.toInt(data, currentDataOffset, length));
vector.putInt(vectorRow, ByteUtil.toXorInt(data, currentDataOffset, length));
} else if (dt == DataTypes.LONG) {
vector.putLong(vectorRow,
DataTypeUtil.getDataBasedOnRestructuredDataType(data, vector.getBlockDataType(),
currentDataOffset, length));
} else if (dt == DataTypes.TIMESTAMP) {
vector.putLong(vectorRow, ByteUtil.toLong(data, currentDataOffset, length) * 1000L);
vector.putLong(vectorRow, ByteUtil.toXorLong(data, currentDataOffset, length) * 1000L);
}
}
}
Expand Down
Expand Up @@ -247,15 +247,15 @@ public void fillRow(int rowId, CarbonColumnVector vector, int vectorRow) {
} else if (dt == DataTypes.BOOLEAN) {
vector.putBoolean(vectorRow, ByteUtil.toBoolean(value[0]));
} else if (dt == DataTypes.SHORT) {
vector.putShort(vectorRow, ByteUtil.toShort(value, 0, length));
vector.putShort(vectorRow, ByteUtil.toXorShort(value, 0, length));
} else if (dt == DataTypes.INT) {
vector.putInt(vectorRow, ByteUtil.toInt(value, 0, length));
vector.putInt(vectorRow, ByteUtil.toXorInt(value, 0, length));
} else if (dt == DataTypes.LONG) {
vector.putLong(vectorRow,
DataTypeUtil.getDataBasedOnRestructuredDataType(value, vector.getBlockDataType(), 0,
length));
} else if (dt == DataTypes.TIMESTAMP) {
vector.putLong(vectorRow, ByteUtil.toLong(value, 0, length) * 1000L);
vector.putLong(vectorRow, ByteUtil.toXorLong(value, 0, length) * 1000L);
}
}
}
Expand Down
Expand Up @@ -155,7 +155,7 @@ private Object getDataObject(ByteBuffer dataBuffer, int size) {
actualData = null;
} else {
actualData = this.directDictGenForDate.getValueFromSurrogate(
ByteUtil.toInt(value, 0, CarbonCommonConstants.INT_SIZE_IN_BYTE));
ByteUtil.toXorInt(value, 0, CarbonCommonConstants.INT_SIZE_IN_BYTE));
}
} else {
actualData = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(value, this.dataType);
Expand Down
Expand Up @@ -296,7 +296,7 @@ private static Object getNoDictionaryDefaultValue(DataType datatype, byte[] defa
value = new String(defaultValue, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
noDictionaryDefaultValue = Long.parseLong(value);
} else if (datatype == DataTypes.TIMESTAMP) {
long timestampValue = ByteUtil.toLong(defaultValue, 0, defaultValue.length);
long timestampValue = ByteUtil.toXorLong(defaultValue, 0, defaultValue.length);
noDictionaryDefaultValue = timestampValue * 1000L;
} else {
noDictionaryDefaultValue =
Expand Down
Expand Up @@ -615,7 +615,7 @@ private BitSet setFilterdIndexToBitSet(DimensionColumnPage dimensionColumnPage,
defaultValue = FilterUtil.getMaskKey(key, currentBlockDimension,
this.segmentProperties.getSortColumnsGenerator());
} else {
defaultValue = ByteUtil.toBytes(key);
defaultValue = ByteUtil.toXorBytes(key);
}
} else {
if (dimColEvaluatorInfo.getDimension().getDataType() == DataTypes.STRING) {
Expand Down
Expand Up @@ -323,7 +323,7 @@ private BitSet getFilteredIndexes(DimensionColumnPage dimensionColumnPage,
defaultValue = FilterUtil.getMaskKey(key, currentBlockDimension,
this.segmentProperties.getSortColumnsGenerator());
} else {
defaultValue = ByteUtil.toBytes(key);
defaultValue = ByteUtil.toXorBytes(key);
}
} else if (dimColEvaluatorInfoList.get(0).getDimension().getDataType() != DataTypes.STRING) {
defaultValue = CarbonCommonConstants.EMPTY_BYTE_ARRAY;
Expand Down
Expand Up @@ -322,7 +322,7 @@ private BitSet getFilteredIndexes(DimensionColumnPage dimensionColumnPage,
defaultValue = FilterUtil.getMaskKey(key, currentBlockDimension,
this.segmentProperties.getSortColumnsGenerator());
} else {
defaultValue = ByteUtil.toBytes(key);
defaultValue = ByteUtil.toXorBytes(key);
}
} else if (dimColEvaluatorInfoList.get(0).getDimension().getDataType() != DataTypes.STRING) {
defaultValue = CarbonCommonConstants.EMPTY_BYTE_ARRAY;
Expand Down

0 comments on commit c842223

Please sign in to comment.