Skip to content

Commit

Permalink
Store compressor name in metadata instead of enum
Browse files Browse the repository at this point in the history
store compressor name in metadata instead of enum, this will make it
more extensible.
  • Loading branch information
xuchuanyin committed Sep 11, 2018
1 parent 308319e commit d21fd86
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 44 deletions.
Expand Up @@ -34,6 +34,7 @@
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
import org.apache.carbondata.core.scan.result.vector.impl.CarbonDictionaryImpl;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.format.Encoding;
import org.apache.carbondata.format.LocalDictionaryChunk;

Expand Down Expand Up @@ -145,8 +146,9 @@ public CarbonDictionary getLocalDictionary() {
if (null != getDataChunkV3() && null != getDataChunkV3().local_dictionary
&& null == localDictionary) {
try {
String compressorName =
getDataChunkV3().data_chunk_list.get(0).chunk_meta.getCompression_codec().name();
String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
getDataChunkV3().data_chunk_list.get(0).chunk_meta);

Compressor compressor = CompressorFactory.getInstance().getCompressor(compressorName);
localDictionary = getDictionary(getDataChunkV3().local_dictionary, compressor);
} catch (IOException | MemoryException e) {
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.DataChunk3;
Expand Down Expand Up @@ -201,7 +202,8 @@ protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileReader fil
// get the data buffer
ByteBuffer rawData = rawColumnPage.getRawData();
DataChunk2 pageMetadata = dataChunk3.getData_chunk_list().get(pageNumber);
String compressorName = pageMetadata.getChunk_meta().getCompression_codec().name();
String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
pageMetadata.getChunk_meta());
this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
// calculating the start point of data
// as buffer can contain multiple column data, start point will be datachunkoffset +
Expand All @@ -217,8 +219,10 @@ private ColumnPage decodeDimensionByMeta(DataChunk2 pageMetadata,
throws IOException, MemoryException {
List<Encoding> encodings = pageMetadata.getEncoders();
List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
pageMetadata.getChunk_meta());
ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, encoderMetas,
pageMetadata.getChunk_meta().getCompression_codec().name());
compressorName);
return decoder
.decode(pageData.array(), offset, pageMetadata.data_page_length, isLocalDictEncodedPage);
}
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.DataChunk3;
Expand Down Expand Up @@ -193,7 +194,8 @@ public ColumnPage decodeColumnPage(
DataChunk3 dataChunk3 = rawColumnChunk.getDataChunkV3();
// data chunk of page
DataChunk2 pageMetadata = dataChunk3.getData_chunk_list().get(pageNumber);
String compressorName = pageMetadata.chunk_meta.compression_codec.name();
String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
pageMetadata.getChunk_meta());
this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
// calculating the start point of data
// as buffer can contain multiple column data, start point will be datachunkoffset +
Expand All @@ -213,8 +215,10 @@ protected ColumnPage decodeMeasure(DataChunk2 pageMetadata, ByteBuffer pageData,
throws MemoryException, IOException {
List<Encoding> encodings = pageMetadata.getEncoders();
List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
pageMetadata.getChunk_meta());
ColumnPageDecoder codec = encodingFactory.createDecoder(encodings, encoderMetas,
pageMetadata.getChunk_meta().getCompression_codec().name());
compressorName);
return codec.decode(pageData.array(), offset, pageMetadata.data_page_length);
}

Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.DataChunk3;
Expand Down Expand Up @@ -139,7 +140,8 @@ protected MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileReader fileRea
DataChunk3 dataChunk3 = rawColumnPage.getDataChunkV3();
// data chunk of page
DataChunk2 pageMetadata = dataChunk3.getData_chunk_list().get(pageNumber);
String compressorName = pageMetadata.chunk_meta.compression_codec.name();
String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
pageMetadata.getChunk_meta());
this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
// calculating the start point of data
// as buffer can contain multiple column data, start point will be datachunkoffset +
Expand Down
Expand Up @@ -23,32 +23,25 @@

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.format.CompressionCodec;

public class CompressorFactory {
private static final CompressorFactory COMPRESSOR_FACTORY = new CompressorFactory();

private final Map<String, SupportedCompressor> compressors = new HashMap<>();

public enum SupportedCompressor {
SNAPPY(CompressionCodec.SNAPPY, "snappy", SnappyCompressor.class),
ZSTD(CompressionCodec.ZSTD, "zstd", ZstdCompressor.class);
SNAPPY("snappy", SnappyCompressor.class),
ZSTD("zstd", ZstdCompressor.class);

private CompressionCodec codec;
private String name;
private Class<Compressor> compressorClass;
private transient Compressor compressor;

SupportedCompressor(CompressionCodec codec, String name, Class compressorCls) {
this.codec = codec;
SupportedCompressor(String name, Class compressorCls) {
this.name = name;
this.compressorClass = compressorCls;
}

public CompressionCodec getCodec() {
return codec;
}

public String getName() {
return name;
}
Expand Down Expand Up @@ -103,13 +96,4 @@ public Compressor getCompressor(String name) {
name + " compressor is not supported, currently we only support "
+ Arrays.toString(SupportedCompressor.values()));
}

public CompressionCodec getCompressionCodec(String name) {
if (compressors.containsKey(name.toLowerCase())) {
return compressors.get(name.toLowerCase()).getCodec();
}
throw new UnsupportedOperationException(
name + " compressor is not supported, currently we only support "
+ Arrays.toString(SupportedCompressor.values()));
}
}
Expand Up @@ -38,6 +38,7 @@
import org.apache.carbondata.format.BlockletMinMaxIndex;
import org.apache.carbondata.format.ChunkCompressionMeta;
import org.apache.carbondata.format.ColumnSchema;
import org.apache.carbondata.format.CompressionCodec;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.DataChunk3;
import org.apache.carbondata.format.FileFooter3;
Expand Down Expand Up @@ -249,18 +250,35 @@ public static BlockletIndex getBlockletIndex(EncodedBlocklet encodedBlocklet,
}

/**
* Right now it is set to default values. We may use this in future
* set the compressor.
* before 1.5.0, we set a enum 'compression_codec';
* after 1.5.0, we use string 'compressor_name' instead
*/
public static ChunkCompressionMeta getChunkCompressorMeta(String compressorName) {
ChunkCompressionMeta chunkCompressionMeta = new ChunkCompressionMeta();

chunkCompressionMeta.setCompression_codec(
CompressorFactory.getInstance().getCompressionCodec(compressorName));
// we will not use this field any longer and will use compressor_name instead,
// but in thrift definition, this field is required so we cannot set it to null, otherwise
// it will cause deserialization error in runtime (required field cannot be null).
chunkCompressionMeta.setCompression_codec(CompressionCodec.DEPRECATED);
chunkCompressionMeta.setCompressor_name(compressorName);
chunkCompressionMeta.setTotal_compressed_size(0);
chunkCompressionMeta.setTotal_uncompressed_size(0);
return chunkCompressionMeta;
}

/**
* get the compressor name from chunk meta
* before 1.5.0, we only support snappy and do not have compressor_name field;
* after 1.5.0, we directly get the compressor from the compressor_name field
*/
public static String getCompressorNameFromChunkMeta(ChunkCompressionMeta chunkCompressionMeta) {
if (chunkCompressionMeta.isSetCompressor_name()) {
return chunkCompressionMeta.getCompressor_name();
} else {
// this is for legacy store before 1.5.0
return CompressorFactory.SupportedCompressor.SNAPPY.getName();
}
}
/**
* Below method will be used to get the index header
*
Expand Down
7 changes: 5 additions & 2 deletions format/src/main/thrift/carbondata.thrift
Expand Up @@ -69,7 +69,8 @@ enum SortState{
*/
enum CompressionCodec{
SNAPPY = 0;
ZSTD = 1;
//** We will not use this CompressionCodec any longer since 1.5.0, but because it is required in some structure, we cannot get rid of it. So here I add another deprecated enum to alert the people who want to use it **//
DEPRECATED = 1;
}

/**
Expand All @@ -83,6 +84,8 @@ struct ChunkCompressionMeta{
2: required i64 total_uncompressed_size;
/** Total byte size of all compressed pages in this column chunk (including the headers) **/
3: required i64 total_compressed_size;
/** compressor name for chunk, this is introduced in 1.5.0 to make compression for final store more extensible. We will first check compression_codec, if it is not set, we will use this compressor_name **/
4: optional string compressor_name;
}

/**
Expand Down Expand Up @@ -213,7 +216,7 @@ struct FileHeader{
4: optional i64 time_stamp; // Timestamp to compare column schema against master schema
5: optional bool is_splitable; // Whether file is splitable or not
6: optional binary sync_marker; // 16 bytes sync marker
7: optional CompressionCodec compressionCodec; // compressor used to compress blocklet data
7: optional string compressor_name;
}

/**
Expand Down
Expand Up @@ -51,7 +51,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory
import org.apache.carbondata.core.metadata.ColumnarFormatVersion
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataFileFooterConverterV3}
import org.apache.carbondata.core.util.{CarbonMetadataUtil, CarbonProperties, CarbonUtil, DataFileFooterConverterV3}
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
import org.apache.carbondata.sdk.file._

Expand Down Expand Up @@ -2589,8 +2589,8 @@ object testUtil{
data: Array[String]): Boolean = {
val local_dictionary = rawColumnPage.getDataChunkV3.local_dictionary
if (null != local_dictionary) {
val compressorName = rawColumnPage.getDataChunkV3.getData_chunk_list.get(0)
.getChunk_meta.getCompression_codec.name()
val compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
rawColumnPage.getDataChunkV3.getData_chunk_list.get(0).getChunk_meta)
val encodings = local_dictionary.getDictionary_meta.encoders
val encoderMetas = local_dictionary.getDictionary_meta.getEncoder_meta
val encodingFactory = DefaultEncodingFactory.getInstance
Expand Down
Expand Up @@ -35,7 +35,7 @@ import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFi
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory
import org.apache.carbondata.core.metadata.ColumnarFormatVersion
import org.apache.carbondata.core.util.{CarbonProperties, DataFileFooterConverterV3}
import org.apache.carbondata.core.util.{CarbonMetadataUtil, CarbonProperties, DataFileFooterConverterV3}

class LocalDictionarySupportLoadTableTest extends QueryTest with BeforeAndAfterAll {

Expand Down Expand Up @@ -277,8 +277,8 @@ class LocalDictionarySupportLoadTableTest extends QueryTest with BeforeAndAfterA
data: Array[String]): Boolean = {
val local_dictionary = rawColumnPage.getDataChunkV3.local_dictionary
if (null != local_dictionary) {
val compressorName = rawColumnPage.getDataChunkV3.getData_chunk_list.get(0)
.getChunk_meta.getCompression_codec.name()
val compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
rawColumnPage.getDataChunkV3.getData_chunk_list.get(0).getChunk_meta)
val encodings = local_dictionary.getDictionary_meta.encoders
val encoderMetas = local_dictionary.getDictionary_meta.getEncoder_meta
val encodingFactory = DefaultEncodingFactory.getInstance
Expand Down
Expand Up @@ -265,8 +265,8 @@ private byte[] getSyncMarker(String filePath) throws IOException {
CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath);
FileHeader header = headerReader.readHeader();
// legacy store does not have this member
if (header.isSetCompressionCodec()) {
compressorName = header.getCompressionCodec().name();
if (header.isSetCompressor_name()) {
compressorName = header.getCompressor_name();
} else {
compressorName = CompressorFactory.SupportedCompressor.SNAPPY.getName();
}
Expand Down
Expand Up @@ -170,8 +170,8 @@ private void initializeAtFirstRow() throws IOException, InterruptedException {
// get the compressor from the fileheader. In legacy store,
// the compressor name is not set and it use snappy compressor
FileHeader header = new CarbonHeaderReader(filePath).readHeader();
if (header.isSetCompressionCodec()) {
compressorName = header.getCompressionCodec().name();
if (header.isSetCompressor_name()) {
compressorName = header.getCompressor_name();
} else {
compressorName = CompressorFactory.SupportedCompressor.SNAPPY.getName();
}
Expand Down Expand Up @@ -315,8 +315,7 @@ private void writeFileHeader() throws IOException {
fileHeader.setIs_footer_present(false);
fileHeader.setIs_splitable(true);
fileHeader.setSync_marker(CarbonStreamOutputFormat.CARBON_SYNC_MARKER);
fileHeader.setCompressionCodec(
CompressorFactory.getInstance().getCompressionCodec(compressorName));
fileHeader.setCompressor_name(compressorName);
outputStream.write(CarbonUtil.getByteArray(fileHeader));
}

Expand Down

0 comments on commit d21fd86

Please sign in to comment.