Skip to content

Commit

Permalink
Implement file-level min/max index for streaming segment
Browse files Browse the repository at this point in the history
  • Loading branch information
QiangCai committed Aug 25, 2018
1 parent e26c3c2 commit 0cc469e
Show file tree
Hide file tree
Showing 11 changed files with 774 additions and 82 deletions.
Expand Up @@ -334,6 +334,10 @@ public TableDataMap getDataMap(CarbonTable table, DataMapSchema dataMapSchema) {
return dataMap;
}

public StreamDataMap getStreamDataMap(CarbonTable table) {
return new StreamDataMap(table);
}

/**
* Return a new datamap instance and registered in the store manager.
* The datamap is created using datamap name, datamap factory class and table identifier.
Expand Down
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.core.datamap;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.Map;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.reader.CarbonIndexFileReader;
import org.apache.carbondata.core.scan.filter.FilterUtil;
import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.BlockIndex;

@InterfaceAudience.Internal
public class StreamDataMap {

private CarbonTable carbonTable;

private AbsoluteTableIdentifier identifier;

private FilterExecuter filterExecuter;

public StreamDataMap(CarbonTable carbonTable) {
this.carbonTable = carbonTable;
this.identifier = carbonTable.getAbsoluteTableIdentifier();
}

public void init(FilterResolverIntf filterExp) {
if (filterExp != null) {

List<CarbonColumn> minMaxCacheColumns = new ArrayList<>();
for (CarbonDimension dimension : carbonTable.getDimensions()) {
if (!dimension.isComplex()) {
minMaxCacheColumns.add(dimension);
}
}
minMaxCacheColumns.addAll(carbonTable.getMeasures());

List<ColumnSchema> listOfColumns =
carbonTable.getTableInfo().getFactTable().getListOfColumns();
int[] columnCardinality = new int[listOfColumns.size()];
for (int index = 0; index < columnCardinality.length; index++) {
columnCardinality[index] = Integer.MAX_VALUE;
}

SegmentProperties segmentProperties =
new SegmentProperties(listOfColumns, columnCardinality);

filterExecuter = FilterUtil.getFilterExecuterTree(
filterExp, segmentProperties, null, minMaxCacheColumns);
}
}

public List<StreamFile> prune(List<Segment> segments) throws IOException {
if (filterExecuter == null) {
return listAllStreamFiles(segments, false);
} else {
List<StreamFile> streamFileList = new ArrayList<>();
for (StreamFile streamFile : listAllStreamFiles(segments, true)) {
if (isScanRequire(streamFile)) {
streamFileList.add(streamFile);
streamFile.setMinMaxIndex(null);
}
}
return streamFileList;
}
}

private boolean isScanRequire(StreamFile streamFile) {
// backward compatibility, old stream file without min/max index
if (streamFile.getMinMaxIndex() == null) {
return true;
}

byte[][] maxValue = streamFile.getMinMaxIndex().getMaxValues();
byte[][] minValue = streamFile.getMinMaxIndex().getMinValues();
BitSet bitSet;
if (filterExecuter instanceof ImplicitColumnFilterExecutor) {
String filePath = streamFile.getFilePath();
String uniqueBlockPath = filePath.substring(filePath.lastIndexOf("/Part") + 1);
bitSet = ((ImplicitColumnFilterExecutor) filterExecuter)
.isFilterValuesPresentInBlockOrBlocklet(maxValue, minValue, uniqueBlockPath);
} else {
bitSet = filterExecuter.isScanRequired(maxValue, minValue);
}
if (!bitSet.isEmpty()) {
return true;
} else {
return false;
}
}

// TODO optimize and move the code to StreamSegment , but it's in the streaming module.
private List<StreamFile> listAllStreamFiles(List<Segment> segments, boolean withMinMax)
throws IOException {
List<StreamFile> streamFileList = new ArrayList<>();
for (Segment segment : segments) {
String segmentDir =
CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
if (FileFactory.isFileExist(segmentDir, fileType)) {
SegmentIndexFileStore segmentIndexFileStore = new SegmentIndexFileStore();
segmentIndexFileStore.readAllIIndexOfSegment(segmentDir);
Map<String, byte[]> carbonIndexMap = segmentIndexFileStore.getCarbonIndexMap();
CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
for (byte[] fileData : carbonIndexMap.values()) {
indexReader.openThriftReader(fileData);
try {
while (indexReader.hasNext()) {
BlockIndex blockIndex = indexReader.readBlockIndexInfo();
String filePath = segmentDir + File.separator + blockIndex.getFile_name();
long length = blockIndex.getFile_size();
StreamFile streamFile = new StreamFile(segment.getSegmentNo(), filePath, length);
streamFileList.add(streamFile);
if (withMinMax) {
if (blockIndex.getBlock_index() != null
&& blockIndex.getBlock_index().getMin_max_index() != null) {
streamFile.setMinMaxIndex(CarbonMetadataUtil.convertExternalMinMaxIndex(
blockIndex.getBlock_index().getMin_max_index()));
}
}
}
} finally {
indexReader.closeThriftReader();
}
}
}
}
return streamFileList;
}
}
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.core.datamap;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;

@InterfaceAudience.Internal
public class StreamFile {

private String segmentNo;

private String filePath;

private long fileSize;

private BlockletMinMaxIndex minMaxIndex;

public StreamFile(String segmentNo, String filePath, long fileSize) {
this.segmentNo = segmentNo;
this.filePath = filePath;
this.fileSize = fileSize;
}

public String getSegmentNo() {
return segmentNo;
}

public void setSegmentNo(String segmentNo) {
this.segmentNo = segmentNo;
}

public String getFilePath() {
return filePath;
}

public void setFilePath(String filePath) {
this.filePath = filePath;
}

public long getFileSize() {
return fileSize;
}

public void setFileSize(long fileSize) {
this.fileSize = fileSize;
}

public BlockletMinMaxIndex getMinMaxIndex() {
return minMaxIndex;
}

public void setMinMaxIndex(BlockletMinMaxIndex minMaxIndex) {
this.minMaxIndex = minMaxIndex;
}
}
Expand Up @@ -96,14 +96,35 @@ private static FileFooter3 getFileFooter3(List<BlockletInfo3> infoList,
return footer;
}

public static BlockletIndex getBlockletIndex(
org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex info) {
public static org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex
convertExternalMinMaxIndex(BlockletMinMaxIndex minMaxIndex) {
if (minMaxIndex == null) {
return null;
}

return new org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex(
minMaxIndex.getMin_values(), minMaxIndex.getMax_values());
}

public static BlockletMinMaxIndex convertMinMaxIndex(
org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex minMaxIndex) {
if (minMaxIndex == null) {
return null;
}

BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();

for (int i = 0; i < info.getMinMaxIndex().getMaxValues().length; i++) {
blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(info.getMinMaxIndex().getMaxValues()[i]));
blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(info.getMinMaxIndex().getMinValues()[i]));
for (int i = 0; i < minMaxIndex.getMaxValues().length; i++) {
blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(minMaxIndex.getMaxValues()[i]));
blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(minMaxIndex.getMinValues()[i]));
}

return blockletMinMaxIndex;
}

public static BlockletIndex getBlockletIndex(
org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex info) {
BlockletMinMaxIndex blockletMinMaxIndex = convertMinMaxIndex(info.getMinMaxIndex());
BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex();
blockletBTreeIndex.setStart_key(info.getBtreeIndex().getStartKey());
blockletBTreeIndex.setEnd_key(info.getBtreeIndex().getEndKey());
Expand Down

0 comments on commit 0cc469e

Please sign in to comment.