Skip to content

Commit

Permalink
Merge 308319e into 7c827c0
Browse files Browse the repository at this point in the history
  • Loading branch information
xuchuanyin committed Sep 11, 2018
2 parents 7c827c0 + 308319e commit aadfbf0
Show file tree
Hide file tree
Showing 80 changed files with 1,548 additions and 535 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.carbondata.core.datastore.chunk.AbstractRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
Expand Down Expand Up @@ -144,7 +145,10 @@ public CarbonDictionary getLocalDictionary() {
if (null != getDataChunkV3() && null != getDataChunkV3().local_dictionary
&& null == localDictionary) {
try {
localDictionary = getDictionary(getDataChunkV3().local_dictionary);
String compressorName =
getDataChunkV3().data_chunk_list.get(0).chunk_meta.getCompression_codec().name();
Compressor compressor = CompressorFactory.getInstance().getCompressor(compressorName);
localDictionary = getDictionary(getDataChunkV3().local_dictionary, compressor);
} catch (IOException | MemoryException e) {
throw new RuntimeException(e);
}
Expand All @@ -160,17 +164,17 @@ public CarbonDictionary getLocalDictionary() {
* @throws IOException
* @throws MemoryException
*/
private CarbonDictionary getDictionary(LocalDictionaryChunk localDictionaryChunk)
throws IOException, MemoryException {
private CarbonDictionary getDictionary(LocalDictionaryChunk localDictionaryChunk,
Compressor compressor) throws IOException, MemoryException {
if (null != localDictionaryChunk) {
List<Encoding> encodings = localDictionaryChunk.getDictionary_meta().getEncoders();
List<ByteBuffer> encoderMetas = localDictionaryChunk.getDictionary_meta().getEncoder_meta();
ColumnPageDecoder decoder =
DefaultEncodingFactory.getInstance().createDecoder(encodings, encoderMetas);
ColumnPageDecoder decoder = DefaultEncodingFactory.getInstance().createDecoder(
encodings, encoderMetas, compressor.getName());
ColumnPage decode = decoder.decode(localDictionaryChunk.getDictionary_data(), 0,
localDictionaryChunk.getDictionary_data().length);
BitSet usedDictionary = BitSet.valueOf(CompressorFactory.getInstance().getCompressor()
.unCompressByte(localDictionaryChunk.getDictionary_values()));
BitSet usedDictionary = BitSet.valueOf(compressor.unCompressByte(
localDictionaryChunk.getDictionary_values()));
int length = usedDictionary.length();
int index = 0;
byte[][] dictionary = new byte[length][];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
import org.apache.carbondata.core.util.CarbonProperties;

Expand All @@ -32,7 +31,7 @@ public abstract class AbstractChunkReader implements DimensionColumnChunkReader
/**
* compressor will be used to uncompress the data
*/
protected static final Compressor COMPRESSOR = CompressorFactory.getInstance().getCompressor();
protected Compressor compressor;

/**
* size of the each column value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReader;
import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory;
import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.datachunk.DataChunk;
import org.apache.carbondata.core.metadata.encoder.Encoding;
Expand Down Expand Up @@ -55,6 +56,8 @@ public CompressedDimensionChunkFileBasedReaderV1(final BlockletInfo blockletInfo
final int[] eachColumnValueSize, final String filePath) {
super(eachColumnValueSize, filePath, blockletInfo.getNumberOfRows());
this.dimensionColumnChunk = blockletInfo.getDimensionColumnChunk();
// for v1 store, the compressor is snappy
this.compressor = CompressorFactory.SupportedCompressor.SNAPPY.getCompressor();
}

/**
Expand Down Expand Up @@ -108,7 +111,7 @@ public CompressedDimensionChunkFileBasedReaderV1(final BlockletInfo blockletInfo
FileReader fileReader = dimensionRawColumnChunk.getFileReader();

ByteBuffer rawData = dimensionRawColumnChunk.getRawData();
dataPage = COMPRESSOR.unCompressByte(rawData.array(), (int) dimensionRawColumnChunk.getOffSet(),
dataPage = compressor.unCompressByte(rawData.array(), (int) dimensionRawColumnChunk.getOffSet(),
dimensionRawColumnChunk.getLength());

// if row id block is present then read the row id chunk and uncompress it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReaderV2V3Format;
import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory;
import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.DataChunk2;
Expand All @@ -47,6 +48,8 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
public CompressedDimensionChunkFileBasedReaderV2(final BlockletInfo blockletInfo,
final int[] eachColumnValueSize, final String filePath) {
super(blockletInfo, eachColumnValueSize, filePath);
// for v2 store, the compressor is snappy
this.compressor = CompressorFactory.SupportedCompressor.SNAPPY.getCompressor();
}

/**
Expand Down Expand Up @@ -143,7 +146,7 @@ public DimensionColumnPage decodeColumnPage(
}

// first read the data and uncompressed it
dataPage = COMPRESSOR
dataPage = compressor
.unCompressByte(rawData.array(), copySourcePoint, dimensionColumnChunk.data_page_length);
copySourcePoint += dimensionColumnChunk.data_page_length;
// if row id block is present then read the row id chunk and uncompress it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.carbondata.core.datastore.chunk.store.ColumnPageWrapper;
import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory;
import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
Expand Down Expand Up @@ -200,6 +201,8 @@ protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileReader fil
// get the data buffer
ByteBuffer rawData = rawColumnPage.getRawData();
DataChunk2 pageMetadata = dataChunk3.getData_chunk_list().get(pageNumber);
String compressorName = pageMetadata.getChunk_meta().getCompression_codec().name();
this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
// calculating the start point of data
// as buffer can contain multiple column data, start point will be datachunkoffset +
// data chunk length + page offset
Expand All @@ -214,7 +217,8 @@ private ColumnPage decodeDimensionByMeta(DataChunk2 pageMetadata,
throws IOException, MemoryException {
List<Encoding> encodings = pageMetadata.getEncoders();
List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, encoderMetas);
ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, encoderMetas,
pageMetadata.getChunk_meta().getCompression_codec().name());
return decoder
.decode(pageData.array(), offset, pageMetadata.data_page_length, isLocalDictEncodedPage);
}
Expand Down Expand Up @@ -242,7 +246,7 @@ protected DimensionColumnPage decodeDimension(DimensionRawColumnChunk rawColumnP
if (isEncodedWithMeta(pageMetadata)) {
ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, offset,
null != rawColumnPage.getLocalDictionary());
decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence));
decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(),
isEncodedWithAdaptiveMeta(pageMetadata));
} else {
Expand Down Expand Up @@ -273,7 +277,7 @@ private DimensionColumnPage decodeDimensionLegacy(DimensionRawColumnChunk rawCol
int[] rlePage;
int[] invertedIndexes = new int[0];
int[] invertedIndexesReverse = new int[0];
dataPage = COMPRESSOR.unCompressByte(pageData.array(), offset, pageMetadata.data_page_length);
dataPage = compressor.unCompressByte(pageData.array(), offset, pageMetadata.data_page_length);
offset += pageMetadata.data_page_length;
// if row id block is present then read the row id chunk and uncompress it
if (CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
package org.apache.carbondata.core.datastore.chunk.reader.measure;

import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;

/**
* Measure block reader abstract class
*/
public abstract class AbstractMeasureChunkReader implements MeasureColumnChunkReader {
protected Compressor compressor;

protected EncodingFactory encodingFactory = DefaultEncodingFactory.getInstance();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReader;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.memory.MemoryException;
Expand Down Expand Up @@ -96,7 +97,8 @@ public ColumnPage decodeColumnPage(MeasureRawColumnChunk measureRawColumnChunk,
int blockIndex = measureRawColumnChunk.getColumnIndex();
DataChunk dataChunk = measureColumnChunks.get(blockIndex);
ValueEncoderMeta meta = dataChunk.getValueEncoderMeta().get(0);
ColumnPageDecoder codec = encodingFactory.createDecoderLegacy(meta);
ColumnPageDecoder codec = encodingFactory.createDecoderLegacy(meta,
CompressorFactory.SupportedCompressor.SNAPPY.getName());
ColumnPage decodedPage = codec.decode(measureRawColumnChunk.getRawData().array(),
(int) measureRawColumnChunk.getOffSet(), dataChunk.getDataPageLength());
decodedPage.setNullBits(dataChunk.getNullValueIndexForColumn());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.memory.MemoryException;
Expand All @@ -46,6 +47,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
public CompressedMeasureChunkFileBasedReaderV2(final BlockletInfo blockletInfo,
final String filePath) {
super(blockletInfo, filePath);
this.compressor = CompressorFactory.SupportedCompressor.SNAPPY.getCompressor();
}

@Override
Expand Down Expand Up @@ -126,7 +128,7 @@ public ColumnPage decodeColumnPage(MeasureRawColumnChunk measureRawColumnChunk,
copyPoint += measureColumnChunkLength.get(blockIndex);

ColumnPage page = decodeMeasure(measureRawColumnChunk, measureColumnChunk, copyPoint);
page.setNullBits(QueryUtil.getNullBitSet(measureColumnChunk.presence));
page.setNullBits(QueryUtil.getNullBitSet(measureColumnChunk.presence, this.compressor));
return page;
}

Expand All @@ -137,7 +139,8 @@ protected ColumnPage decodeMeasure(MeasureRawColumnChunk measureRawColumnChunk,
byte[] encodedMeta = encoder_meta.get(0).array();

ValueEncoderMeta meta = CarbonUtil.deserializeEncoderMetaV2(encodedMeta);
ColumnPageDecoder codec = encodingFactory.createDecoderLegacy(meta);
ColumnPageDecoder codec = encodingFactory.createDecoderLegacy(meta,
CompressorFactory.SupportedCompressor.SNAPPY.getName());
byte[] rawData = measureRawColumnChunk.getRawData().array();
return codec.decode(rawData, copyPoint, measureColumnChunk.data_page_length);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.memory.MemoryException;
Expand Down Expand Up @@ -192,14 +193,16 @@ public ColumnPage decodeColumnPage(
DataChunk3 dataChunk3 = rawColumnChunk.getDataChunkV3();
// data chunk of page
DataChunk2 pageMetadata = dataChunk3.getData_chunk_list().get(pageNumber);
String compressorName = pageMetadata.chunk_meta.compression_codec.name();
this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
// calculating the start point of data
// as buffer can contain multiple column data, start point will be datachunkoffset +
// data chunk length + page offset
int offset = (int) rawColumnChunk.getOffSet() +
measureColumnChunkLength.get(rawColumnChunk.getColumnIndex()) +
dataChunk3.getPage_offset().get(pageNumber);
ColumnPage decodedPage = decodeMeasure(pageMetadata, rawColumnChunk.getRawData(), offset);
decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence));
decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
return decodedPage;
}

Expand All @@ -210,7 +213,8 @@ protected ColumnPage decodeMeasure(DataChunk2 pageMetadata, ByteBuffer pageData,
throws MemoryException, IOException {
List<Encoding> encodings = pageMetadata.getEncoders();
List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
ColumnPageDecoder codec = encodingFactory.createDecoder(encodings, encoderMetas);
ColumnPageDecoder codec = encodingFactory.createDecoder(encodings, encoderMetas,
pageMetadata.getChunk_meta().getCompression_codec().name());
return codec.decode(pageData.array(), offset, pageMetadata.data_page_length);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
Expand Down Expand Up @@ -138,6 +139,8 @@ protected MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileReader fileRea
DataChunk3 dataChunk3 = rawColumnPage.getDataChunkV3();
// data chunk of page
DataChunk2 pageMetadata = dataChunk3.getData_chunk_list().get(pageNumber);
String compressorName = pageMetadata.chunk_meta.compression_codec.name();
this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
// calculating the start point of data
// as buffer can contain multiple column data, start point will be datachunkoffset +
// data chunk length + page offset
Expand All @@ -147,7 +150,7 @@ protected MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileReader fileRea
.readByteBuffer(filePath, offset, pageMetadata.data_page_length);

ColumnPage decodedPage = decodeMeasure(pageMetadata, buffer, 0);
decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence));
decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
return decodedPage;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface Compressor {

byte[] compressShort(short[] unCompInput);

short[] unCompressShort(byte[] compInput, int offset, int lenght);
short[] unCompressShort(byte[] compInput, int offset, int length);

byte[] compressInt(int[] unCompInput);

Expand All @@ -55,5 +55,14 @@ public interface Compressor {

long rawUncompress(byte[] input, byte[] output) throws IOException;

int maxCompressedLength(int inputSize);
long maxCompressedLength(long inputSize);

/**
* Whether this compressor support zero-copy during compression.
* Zero-copy means that the compressor support receiving memory address (pointer)
* and returning result in memory address (pointer).
* Currently not all java version of the compressors support this feature.
* @return true if it supports, otherwise return false
*/
boolean supportUnsafe();
}

0 comments on commit aadfbf0

Please sign in to comment.