Skip to content

Commit

Permalink
Merge 3259d25 into 94a4f83
Browse files Browse the repository at this point in the history
  • Loading branch information
xuchuanyin committed Oct 31, 2018
2 parents 94a4f83 + 3259d25 commit bb1e877
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 39 deletions.
Expand Up @@ -57,7 +57,7 @@ public CompressedDimensionChunkFileBasedReaderV1(final BlockletInfo blockletInfo
super(eachColumnValueSize, filePath, blockletInfo.getNumberOfRows());
this.dimensionColumnChunk = blockletInfo.getDimensionColumnChunk();
// for v1 store, the compressor is snappy
this.compressor = CompressorFactory.SupportedCompressor.SNAPPY.getCompressor();
this.compressor = CompressorFactory.NativeSupportedCompressor.SNAPPY.getCompressor();
}

/**
Expand Down
Expand Up @@ -49,7 +49,7 @@ public CompressedDimensionChunkFileBasedReaderV2(final BlockletInfo blockletInfo
final int[] eachColumnValueSize, final String filePath) {
super(blockletInfo, eachColumnValueSize, filePath);
// for v2 store, the compressor is snappy
this.compressor = CompressorFactory.SupportedCompressor.SNAPPY.getCompressor();
this.compressor = CompressorFactory.NativeSupportedCompressor.SNAPPY.getCompressor();
}

/**
Expand Down
Expand Up @@ -98,7 +98,7 @@ public ColumnPage decodeColumnPage(MeasureRawColumnChunk measureRawColumnChunk,
DataChunk dataChunk = measureColumnChunks.get(blockIndex);
ValueEncoderMeta meta = dataChunk.getValueEncoderMeta().get(0);
ColumnPageDecoder codec = encodingFactory.createDecoderLegacy(meta,
CompressorFactory.SupportedCompressor.SNAPPY.getName());
CompressorFactory.NativeSupportedCompressor.SNAPPY.getName());
ColumnPage decodedPage = codec.decode(measureRawColumnChunk.getRawData().array(),
(int) measureRawColumnChunk.getOffSet(), dataChunk.getDataPageLength());
decodedPage.setNullBits(dataChunk.getNullValueIndexForColumn());
Expand Down
Expand Up @@ -47,7 +47,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
public CompressedMeasureChunkFileBasedReaderV2(final BlockletInfo blockletInfo,
final String filePath) {
super(blockletInfo, filePath);
this.compressor = CompressorFactory.SupportedCompressor.SNAPPY.getCompressor();
this.compressor = CompressorFactory.NativeSupportedCompressor.SNAPPY.getCompressor();
}

@Override
Expand Down Expand Up @@ -140,7 +140,7 @@ protected ColumnPage decodeMeasure(MeasureRawColumnChunk measureRawColumnChunk,

ValueEncoderMeta meta = CarbonUtil.deserializeEncoderMetaV2(encodedMeta);
ColumnPageDecoder codec = encodingFactory.createDecoderLegacy(meta,
CompressorFactory.SupportedCompressor.SNAPPY.getName());
CompressorFactory.NativeSupportedCompressor.SNAPPY.getName());
byte[] rawData = measureRawColumnChunk.getRawData().array();
return codec.decode(rawData, copyPoint, measureColumnChunk.data_page_length);
}
Expand Down
Expand Up @@ -17,27 +17,32 @@

package org.apache.carbondata.core.datastore.compression;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.util.CarbonProperties;

import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;

public class CompressorFactory {
private static final Logger LOGGER = LogServiceFactory.getLogService(
CompressorFactory.class.getName());
private static final CompressorFactory COMPRESSOR_FACTORY = new CompressorFactory();

private final Map<String, SupportedCompressor> compressors = new HashMap<>();
private final Map<String, Compressor> allSupportedCompressors = new HashMap<>();

public enum SupportedCompressor {
public enum NativeSupportedCompressor {
SNAPPY("snappy", SnappyCompressor.class),
ZSTD("zstd", ZstdCompressor.class);

private String name;
private Class<Compressor> compressorClass;
private transient Compressor compressor;

SupportedCompressor(String name, Class compressorCls) {
NativeSupportedCompressor(String name, Class compressorCls) {
this.name = name;
this.compressorClass = compressorCls;
}
Expand All @@ -54,23 +59,64 @@ public Compressor getCompressor() {
try {
this.compressor = compressorClass.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException("Exception occurs while getting compressor for " + name);
throw new RuntimeException("Exception occurs while getting compressor for " + name, e);
}
}
return this.compressor;
}
}

private CompressorFactory() {
for (SupportedCompressor supportedCompressor : SupportedCompressor.values()) {
compressors.put(supportedCompressor.getName(), supportedCompressor);
for (NativeSupportedCompressor nativeSupportedCompressor : NativeSupportedCompressor.values()) {
allSupportedCompressors.put(nativeSupportedCompressor.getName(),
nativeSupportedCompressor.getCompressor());
}
}

public static CompressorFactory getInstance() {
return COMPRESSOR_FACTORY;
}

/**
* register the compressor using reflection.
* If the class name of the compressor has already been registered before, it will return false;
* If the reflection fails to work or the compressor name has problem, it will throw
* RunTimeException; If it is registered successfully, it will return true.
*
* @param compressorClassName full class name of the compressor
* @return true if register successfully, false if failed.
*/
private Compressor registerColumnCompressor(String compressorClassName) {
if (allSupportedCompressors.containsKey(compressorClassName)) {
return allSupportedCompressors.get(compressorClassName);
}

Class clazz;
try {
clazz = Class.forName(compressorClassName);
Object instance = clazz.newInstance();
if (instance instanceof Compressor) {
if (!((Compressor) instance).getName().equals(compressorClassName)) {
throw new RuntimeException(String.format("For not carbondata native supported compressor,"
+ " the result of method getName() should be the full class name. Expected '%s',"
+ " found '%s'", compressorClassName, ((Compressor) instance).getName()));
}
allSupportedCompressors.put(compressorClassName, (Compressor) instance);
LOGGER.info(
String.format("sucessfully register compressor %s to carbondata", compressorClassName));
return (Compressor) instance;
} else {
throw new RuntimeException(
String.format("Compressor '%s' should be a subclass of '%s'",
compressorClassName, Compressor.class.getCanonicalName()));
}
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
LOGGER.error(String.format("Failed to register compressor '%s'", compressorClassName), e);
throw new RuntimeException(
String.format("Failed to load compressor '%s', currently carbondata supports %s",
compressorClassName, StringUtils.join(allSupportedCompressors.keySet(), ", ")), e);
}
}
/**
* get the default compressor.
* This method can only be called in data load procedure to compress column page.
Expand All @@ -80,20 +126,27 @@ public static CompressorFactory getInstance() {
public Compressor getCompressor() {
String compressorType = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.COMPRESSOR, CarbonCommonConstants.DEFAULT_COMPRESSOR);
if (!compressors.containsKey(compressorType)) {
throw new UnsupportedOperationException(
"Invalid compressor type provided! Currently we only support "
+ Arrays.toString(SupportedCompressor.values()));
}
return getCompressor(compressorType);
}

public Compressor getCompressor(String name) {
if (compressors.containsKey(name.toLowerCase())) {
return compressors.get(name.toLowerCase()).getCompressor();
String internalCompressorName = getInternalCompressorName(name);
if (null == internalCompressorName) {
// maybe this is a new compressor, we will try to register it
return registerColumnCompressor(name);
} else {
return allSupportedCompressors.get(internalCompressorName);
}
}

// if we specify the compressor name in table property, carbondata now will convert the
// property value to lowercase, so here we will ingore the case and find the real name.
private String getInternalCompressorName(String name) {
for (String key : allSupportedCompressors.keySet()) {
if (key.equalsIgnoreCase(name)) {
return key;
}
}
throw new UnsupportedOperationException(
name + " compressor is not supported, currently we only support "
+ Arrays.toString(SupportedCompressor.values()));
return null;
}
}
Expand Up @@ -359,7 +359,8 @@ public static byte[] convertSchemaToBinary(List<ColumnSchema> columnSchemas) thr
}
byte[] byteArray = stream.toByteArray();
// Compress to reduce the size of schema
return CompressorFactory.SupportedCompressor.SNAPPY.getCompressor().compressByte(byteArray);
return CompressorFactory.NativeSupportedCompressor.SNAPPY.getCompressor().compressByte(
byteArray);
}

/**
Expand All @@ -370,7 +371,7 @@ public static byte[] convertSchemaToBinary(List<ColumnSchema> columnSchemas) thr
*/
public static List<ColumnSchema> readColumnSchema(byte[] schemaArray) throws IOException {
// uncompress it.
schemaArray = CompressorFactory.SupportedCompressor.SNAPPY.getCompressor().unCompressByte(
schemaArray = CompressorFactory.NativeSupportedCompressor.SNAPPY.getCompressor().unCompressByte(
schemaArray);
ByteArrayInputStream schemaStream = new ByteArrayInputStream(schemaArray);
DataInput schemaInput = new DataInputStream(schemaStream);
Expand Down
Expand Up @@ -337,7 +337,7 @@ public static String getCompressorNameFromChunkMeta(ChunkCompressionMeta chunkCo
return chunkCompressionMeta.getCompressor_name();
} else {
// this is for legacy store before 1.5.0
return CompressorFactory.SupportedCompressor.SNAPPY.getName();
return CompressorFactory.NativeSupportedCompressor.SNAPPY.getName();
}
}
/**
Expand Down

0 comments on commit bb1e877

Please sign in to comment.