Skip to content

Commit

Permalink
Merge d21fd86 into 8f1a029
Browse files Browse the repository at this point in the history
  • Loading branch information
xuchuanyin committed Sep 11, 2018
2 parents 8f1a029 + d21fd86 commit c21fea7
Show file tree
Hide file tree
Showing 80 changed files with 1,567 additions and 538 deletions.
Expand Up @@ -26,13 +26,15 @@
import org.apache.carbondata.core.datastore.chunk.AbstractRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
import org.apache.carbondata.core.scan.result.vector.impl.CarbonDictionaryImpl;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.format.Encoding;
import org.apache.carbondata.format.LocalDictionaryChunk;

Expand Down Expand Up @@ -144,7 +146,11 @@ public CarbonDictionary getLocalDictionary() {
if (null != getDataChunkV3() && null != getDataChunkV3().local_dictionary
&& null == localDictionary) {
try {
localDictionary = getDictionary(getDataChunkV3().local_dictionary);
String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
getDataChunkV3().data_chunk_list.get(0).chunk_meta);

Compressor compressor = CompressorFactory.getInstance().getCompressor(compressorName);
localDictionary = getDictionary(getDataChunkV3().local_dictionary, compressor);
} catch (IOException | MemoryException e) {
throw new RuntimeException(e);
}
Expand All @@ -160,17 +166,17 @@ public CarbonDictionary getLocalDictionary() {
* @throws IOException
* @throws MemoryException
*/
private CarbonDictionary getDictionary(LocalDictionaryChunk localDictionaryChunk)
throws IOException, MemoryException {
private CarbonDictionary getDictionary(LocalDictionaryChunk localDictionaryChunk,
Compressor compressor) throws IOException, MemoryException {
if (null != localDictionaryChunk) {
List<Encoding> encodings = localDictionaryChunk.getDictionary_meta().getEncoders();
List<ByteBuffer> encoderMetas = localDictionaryChunk.getDictionary_meta().getEncoder_meta();
ColumnPageDecoder decoder =
DefaultEncodingFactory.getInstance().createDecoder(encodings, encoderMetas);
ColumnPageDecoder decoder = DefaultEncodingFactory.getInstance().createDecoder(
encodings, encoderMetas, compressor.getName());
ColumnPage decode = decoder.decode(localDictionaryChunk.getDictionary_data(), 0,
localDictionaryChunk.getDictionary_data().length);
BitSet usedDictionary = BitSet.valueOf(CompressorFactory.getInstance().getCompressor()
.unCompressByte(localDictionaryChunk.getDictionary_values()));
BitSet usedDictionary = BitSet.valueOf(compressor.unCompressByte(
localDictionaryChunk.getDictionary_values()));
int length = usedDictionary.length();
int index = 0;
byte[][] dictionary = new byte[length][];
Expand Down
Expand Up @@ -19,7 +19,6 @@
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
import org.apache.carbondata.core.util.CarbonProperties;

Expand All @@ -32,7 +31,7 @@ public abstract class AbstractChunkReader implements DimensionColumnChunkReader
/**
* compressor will be used to uncompress the data
*/
protected static final Compressor COMPRESSOR = CompressorFactory.getInstance().getCompressor();
protected Compressor compressor;

/**
* size of the each column value
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReader;
import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory;
import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.datachunk.DataChunk;
import org.apache.carbondata.core.metadata.encoder.Encoding;
Expand Down Expand Up @@ -55,6 +56,8 @@ public CompressedDimensionChunkFileBasedReaderV1(final BlockletInfo blockletInfo
final int[] eachColumnValueSize, final String filePath) {
super(eachColumnValueSize, filePath, blockletInfo.getNumberOfRows());
this.dimensionColumnChunk = blockletInfo.getDimensionColumnChunk();
// for v1 store, the compressor is snappy
this.compressor = CompressorFactory.SupportedCompressor.SNAPPY.getCompressor();
}

/**
Expand Down Expand Up @@ -108,7 +111,7 @@ public CompressedDimensionChunkFileBasedReaderV1(final BlockletInfo blockletInfo
FileReader fileReader = dimensionRawColumnChunk.getFileReader();

ByteBuffer rawData = dimensionRawColumnChunk.getRawData();
dataPage = COMPRESSOR.unCompressByte(rawData.array(), (int) dimensionRawColumnChunk.getOffSet(),
dataPage = compressor.unCompressByte(rawData.array(), (int) dimensionRawColumnChunk.getOffSet(),
dimensionRawColumnChunk.getLength());

// if row id block is present then read the row id chunk and uncompress it
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReaderV2V3Format;
import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory;
import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.DataChunk2;
Expand All @@ -47,6 +48,8 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
public CompressedDimensionChunkFileBasedReaderV2(final BlockletInfo blockletInfo,
final int[] eachColumnValueSize, final String filePath) {
super(blockletInfo, eachColumnValueSize, filePath);
// for v2 store, the compressor is snappy
this.compressor = CompressorFactory.SupportedCompressor.SNAPPY.getCompressor();
}

/**
Expand Down Expand Up @@ -143,7 +146,7 @@ public DimensionColumnPage decodeColumnPage(
}

// first read the data and uncompressed it
dataPage = COMPRESSOR
dataPage = compressor
.unCompressByte(rawData.array(), copySourcePoint, dimensionColumnChunk.data_page_length);
copySourcePoint += dimensionColumnChunk.data_page_length;
// if row id block is present then read the row id chunk and uncompress it
Expand Down
Expand Up @@ -30,13 +30,15 @@
import org.apache.carbondata.core.datastore.chunk.store.ColumnPageWrapper;
import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory;
import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.DataChunk3;
Expand Down Expand Up @@ -200,6 +202,9 @@ protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileReader fil
// get the data buffer
ByteBuffer rawData = rawColumnPage.getRawData();
DataChunk2 pageMetadata = dataChunk3.getData_chunk_list().get(pageNumber);
String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
pageMetadata.getChunk_meta());
this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
// calculating the start point of data
// as buffer can contain multiple column data, start point will be datachunkoffset +
// data chunk length + page offset
Expand All @@ -214,7 +219,10 @@ private ColumnPage decodeDimensionByMeta(DataChunk2 pageMetadata,
throws IOException, MemoryException {
List<Encoding> encodings = pageMetadata.getEncoders();
List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, encoderMetas);
String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
pageMetadata.getChunk_meta());
ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, encoderMetas,
compressorName);
return decoder
.decode(pageData.array(), offset, pageMetadata.data_page_length, isLocalDictEncodedPage);
}
Expand Down Expand Up @@ -242,7 +250,7 @@ protected DimensionColumnPage decodeDimension(DimensionRawColumnChunk rawColumnP
if (isEncodedWithMeta(pageMetadata)) {
ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, offset,
null != rawColumnPage.getLocalDictionary());
decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence));
decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(),
isEncodedWithAdaptiveMeta(pageMetadata));
} else {
Expand Down Expand Up @@ -273,7 +281,7 @@ private DimensionColumnPage decodeDimensionLegacy(DimensionRawColumnChunk rawCol
int[] rlePage;
int[] invertedIndexes = new int[0];
int[] invertedIndexesReverse = new int[0];
dataPage = COMPRESSOR.unCompressByte(pageData.array(), offset, pageMetadata.data_page_length);
dataPage = compressor.unCompressByte(pageData.array(), offset, pageMetadata.data_page_length);
offset += pageMetadata.data_page_length;
// if row id block is present then read the row id chunk and uncompress it
if (CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX)) {
Expand Down
Expand Up @@ -17,13 +17,15 @@
package org.apache.carbondata.core.datastore.chunk.reader.measure;

import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;

/**
* Measure block reader abstract class
*/
public abstract class AbstractMeasureChunkReader implements MeasureColumnChunkReader {
protected Compressor compressor;

protected EncodingFactory encodingFactory = DefaultEncodingFactory.getInstance();

Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReader;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.memory.MemoryException;
Expand Down Expand Up @@ -96,7 +97,8 @@ public ColumnPage decodeColumnPage(MeasureRawColumnChunk measureRawColumnChunk,
int blockIndex = measureRawColumnChunk.getColumnIndex();
DataChunk dataChunk = measureColumnChunks.get(blockIndex);
ValueEncoderMeta meta = dataChunk.getValueEncoderMeta().get(0);
ColumnPageDecoder codec = encodingFactory.createDecoderLegacy(meta);
ColumnPageDecoder codec = encodingFactory.createDecoderLegacy(meta,
CompressorFactory.SupportedCompressor.SNAPPY.getName());
ColumnPage decodedPage = codec.decode(measureRawColumnChunk.getRawData().array(),
(int) measureRawColumnChunk.getOffSet(), dataChunk.getDataPageLength());
decodedPage.setNullBits(dataChunk.getNullValueIndexForColumn());
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.memory.MemoryException;
Expand All @@ -46,6 +47,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
public CompressedMeasureChunkFileBasedReaderV2(final BlockletInfo blockletInfo,
final String filePath) {
super(blockletInfo, filePath);
this.compressor = CompressorFactory.SupportedCompressor.SNAPPY.getCompressor();
}

@Override
Expand Down Expand Up @@ -126,7 +128,7 @@ public ColumnPage decodeColumnPage(MeasureRawColumnChunk measureRawColumnChunk,
copyPoint += measureColumnChunkLength.get(blockIndex);

ColumnPage page = decodeMeasure(measureRawColumnChunk, measureColumnChunk, copyPoint);
page.setNullBits(QueryUtil.getNullBitSet(measureColumnChunk.presence));
page.setNullBits(QueryUtil.getNullBitSet(measureColumnChunk.presence, this.compressor));
return page;
}

Expand All @@ -137,7 +139,8 @@ protected ColumnPage decodeMeasure(MeasureRawColumnChunk measureRawColumnChunk,
byte[] encodedMeta = encoder_meta.get(0).array();

ValueEncoderMeta meta = CarbonUtil.deserializeEncoderMetaV2(encodedMeta);
ColumnPageDecoder codec = encodingFactory.createDecoderLegacy(meta);
ColumnPageDecoder codec = encodingFactory.createDecoderLegacy(meta,
CompressorFactory.SupportedCompressor.SNAPPY.getName());
byte[] rawData = measureRawColumnChunk.getRawData().array();
return codec.decode(rawData, copyPoint, measureColumnChunk.data_page_length);
}
Expand Down
Expand Up @@ -23,11 +23,13 @@
import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.DataChunk3;
Expand Down Expand Up @@ -192,14 +194,17 @@ public ColumnPage decodeColumnPage(
DataChunk3 dataChunk3 = rawColumnChunk.getDataChunkV3();
// data chunk of page
DataChunk2 pageMetadata = dataChunk3.getData_chunk_list().get(pageNumber);
String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
pageMetadata.getChunk_meta());
this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
// calculating the start point of data
// as buffer can contain multiple column data, start point will be datachunkoffset +
// data chunk length + page offset
int offset = (int) rawColumnChunk.getOffSet() +
measureColumnChunkLength.get(rawColumnChunk.getColumnIndex()) +
dataChunk3.getPage_offset().get(pageNumber);
ColumnPage decodedPage = decodeMeasure(pageMetadata, rawColumnChunk.getRawData(), offset);
decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence));
decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
return decodedPage;
}

Expand All @@ -210,7 +215,10 @@ protected ColumnPage decodeMeasure(DataChunk2 pageMetadata, ByteBuffer pageData,
throws MemoryException, IOException {
List<Encoding> encodings = pageMetadata.getEncoders();
List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
ColumnPageDecoder codec = encodingFactory.createDecoder(encodings, encoderMetas);
String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
pageMetadata.getChunk_meta());
ColumnPageDecoder codec = encodingFactory.createDecoder(encodings, encoderMetas,
compressorName);
return codec.decode(pageData.array(), offset, pageMetadata.data_page_length);
}

Expand Down
Expand Up @@ -22,10 +22,12 @@

import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.DataChunk3;
Expand Down Expand Up @@ -138,6 +140,9 @@ protected MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileReader fileRea
DataChunk3 dataChunk3 = rawColumnPage.getDataChunkV3();
// data chunk of page
DataChunk2 pageMetadata = dataChunk3.getData_chunk_list().get(pageNumber);
String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
pageMetadata.getChunk_meta());
this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
// calculating the start point of data
// as buffer can contain multiple column data, start point will be datachunkoffset +
// data chunk length + page offset
Expand All @@ -147,7 +152,7 @@ protected MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileReader fileRea
.readByteBuffer(filePath, offset, pageMetadata.data_page_length);

ColumnPage decodedPage = decodeMeasure(pageMetadata, buffer, 0);
decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence));
decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
return decodedPage;
}

Expand Down
Expand Up @@ -33,7 +33,7 @@ public interface Compressor {

byte[] compressShort(short[] unCompInput);

short[] unCompressShort(byte[] compInput, int offset, int lenght);
short[] unCompressShort(byte[] compInput, int offset, int length);

byte[] compressInt(int[] unCompInput);

Expand All @@ -55,5 +55,14 @@ public interface Compressor {

long rawUncompress(byte[] input, byte[] output) throws IOException;

int maxCompressedLength(int inputSize);
long maxCompressedLength(long inputSize);

/**
* Whether this compressor support zero-copy during compression.
* Zero-copy means that the compressor support receiving memory address (pointer)
* and returning result in memory address (pointer).
* Currently not all java version of the compressors support this feature.
* @return true if it supports, otherwise return false
*/
boolean supportUnsafe();
}

0 comments on commit c21fea7

Please sign in to comment.