Skip to content

Commit

Permalink
Support customize column compressor
Browse files Browse the repository at this point in the history
Support customize column compressor
  • Loading branch information
xuchuanyin committed Sep 12, 2018
1 parent cdae48c commit fcb9abd
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@

package org.apache.carbondata.core.datastore.compression;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.util.CarbonProperties;

import org.apache.commons.lang3.StringUtils;

public class CompressorFactory {
private static final LogService LOGGER = LogServiceFactory.getLogService(
CompressorFactory.class.getName());
private static final CompressorFactory COMPRESSOR_FACTORY = new CompressorFactory();

private final Map<String, SupportedCompressor> compressors = new HashMap<>();
private final Map<String, Compressor> supportedCompressors = new HashMap<>();

public enum SupportedCompressor {
SNAPPY("snappy", SnappyCompressor.class),
Expand Down Expand Up @@ -63,14 +68,54 @@ public Compressor getCompressor() {

private CompressorFactory() {
for (SupportedCompressor supportedCompressor : SupportedCompressor.values()) {
compressors.put(supportedCompressor.getName(), supportedCompressor);
supportedCompressors.put(supportedCompressor.getName(),
supportedCompressor.getCompressor());
}
}

public static CompressorFactory getInstance() {
return COMPRESSOR_FACTORY;
}

/**
* register the compressor using reflection.
* If the class name or the short name of the compressor has already been registed before,
* it will return false; If the reflection fails to work, it will throw RunTimeException; If it
* is registered successfully, it will return true.
*
* @param compressorClassName full class name of the compressor
* @return true if register successfully, false if failed.
*/
public boolean registerColumnCompressor(String compressorClassName) {
if (supportedCompressors.containsKey(compressorClassName)) {
LOGGER.warn(String.format("Compressor %s is already existed", compressorClassName));
return false;
}

Class clazz;
try {
clazz = Class.forName(compressorClassName);
Object instance = clazz.newInstance();
// we register the class name and the short name both, so that user can use either of them
if (instance instanceof Compressor) {
if (supportedCompressors.containsKey(((Compressor) instance).getName())) {
LOGGER.warn(String.format("Compressor %s is already existed with short name %s",
compressorClassName, ((Compressor) instance).getName()));
return false;
}
supportedCompressors.put(compressorClassName, (Compressor) instance);
supportedCompressors.put(((Compressor) instance).getName(), (Compressor) instance);
return true;
} else {
throw new RuntimeException(
String.format("Compressor %s should be a subclass of %s",
compressorClassName, Compressor.class.getCanonicalName()));
}
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
LOGGER.error(e, "Failed to add column compressor " + compressorClassName);
throw new RuntimeException(e);
}
}
/**
* get the default compressor.
* This method can only be called in data load procedure to compress column page.
Expand All @@ -80,20 +125,28 @@ public static CompressorFactory getInstance() {
public Compressor getCompressor() {
String compressorType = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.COMPRESSOR, CarbonCommonConstants.DEFAULT_COMPRESSOR);
if (!compressors.containsKey(compressorType)) {
throw new UnsupportedOperationException(
"Invalid compressor type provided! Currently we only support "
+ Arrays.toString(SupportedCompressor.values()));
}
return getCompressor(compressorType);
}

public Compressor getCompressor(String name) {
if (compressors.containsKey(name.toLowerCase())) {
return compressors.get(name.toLowerCase()).getCompressor();
String internalCompressorName = getInternalCompressorName(name);
if (null != internalCompressorName) {
return supportedCompressors.get(internalCompressorName);
}

throw new UnsupportedOperationException(
name + " compressor is not supported, currently we only support "
+ Arrays.toString(SupportedCompressor.values()));
+ StringUtils.join(supportedCompressors.keySet(), ", "));
}

// if we specify the compressor name in table property, carbondata now will convert the
// property value to lowercase, so here we will ingore the case.
private String getInternalCompressorName(String name) {
for (String key : supportedCompressors.keySet()) {
if (key.equalsIgnoreCase(name)) {
return key;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.carbondata.integration.spark.testsuite.dataload

import java.io.File
import java.nio.ByteBuffer
import java.text.SimpleDateFormat
import java.util.concurrent.{ExecutorService, Executors, Future}
import java.util.Calendar
Expand All @@ -31,9 +32,10 @@ import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.compression.{Compressor, CompressorFactory}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.exception.InvalidConfigurationException
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.streaming.parser.CarbonStreamParser

Expand All @@ -42,6 +44,112 @@ case class Rcd(booleanField: Boolean, shortField: Short, intField: Int, bigintFi
dateField: String, charField: String, floatField: Float, stringDictField: String,
stringSortField: String, stringLocalDictField: String, longStringField: String)

/**
* This compressor actually will not compress or decompress anything.
* It is used for test case of specifying customized compressor.
*/
class CustomizeCompressor extends Compressor {
override def getName: String = "Customize"

override def compressByte(unCompInput: Array[Byte]): Array[Byte] = unCompInput

override def compressByte(unCompInput: Array[Byte], byteSize: Int): Array[Byte] = unCompInput

override def unCompressByte(compInput: Array[Byte]): Array[Byte] = compInput

override def unCompressByte(compInput: Array[Byte], offset: Int, length: Int): Array[Byte] = compInput

override def compressShort(unCompInput: Array[Short]): Array[Byte] = {
val buffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_SHORT)
buffer.asShortBuffer().put(unCompInput)
compressByte(buffer.array())
}

override def unCompressShort(compInput: Array[Byte], offset: Int, length: Int): Array[Short] = {
val buffer = ByteBuffer.wrap(compInput).asShortBuffer()
val res = new Array[Short](compInput.length / ByteUtil.SIZEOF_SHORT)
buffer.get(res)
res
}

override def compressInt(unCompInput: Array[Int]): Array[Byte] = {
val buffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_INT)
buffer.asIntBuffer().put(unCompInput)
compressByte(buffer.array())
}

override def unCompressInt(compInput: Array[Byte], offset: Int, length: Int): Array[Int] = {
val buffer = ByteBuffer.wrap(compInput).asIntBuffer()
val res = new Array[Int](compInput.length / ByteUtil.SIZEOF_INT)
buffer.get(res)
res
}

override def compressLong(unCompInput: Array[Long]): Array[Byte] = {
val buffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_LONG)
buffer.asLongBuffer().put(unCompInput)
compressByte(buffer.array())
}

override def unCompressLong(compInput: Array[Byte], offset: Int, length: Int): Array[Long] = {
val buffer = ByteBuffer.wrap(compInput).asLongBuffer()
val res = new Array[Long](compInput.length / ByteUtil.SIZEOF_LONG)
buffer.get(res)
res
}

override def compressFloat(unCompInput: Array[Float]): Array[Byte] = {
val buffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_FLOAT)
buffer.asFloatBuffer().put(unCompInput)
compressByte(buffer.array())
}

override def unCompressFloat(compInput: Array[Byte], offset: Int, length: Int): Array[Float] = {
val buffer = ByteBuffer.wrap(compInput).asFloatBuffer()
val res = new Array[Float](compInput.length / ByteUtil.SIZEOF_FLOAT)
buffer.get(res)
res
}

override def compressDouble(unCompInput: Array[Double]): Array[Byte] = {
val buffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_DOUBLE)
buffer.asDoubleBuffer().put(unCompInput)
compressByte(buffer.array())
}

override def unCompressDouble(compInput: Array[Byte], offset: Int, length: Int): Array[Double] = {
val buffer = ByteBuffer.wrap(compInput).asDoubleBuffer()
val res = new Array[Double](compInput.length / ByteUtil.SIZEOF_DOUBLE)
buffer.get(res)
res
}

override def rawCompress(inputAddress: Long, inputSize: Int, outputAddress: Long): Long = {
throw new RuntimeException("Not implemented rawCompress for customized compressor yet")
}

override def rawUncompress(input: Array[Byte], output: Array[Byte]): Long = {
System.arraycopy(input, 0, output, 0, input.length)
input.length
}

override def maxCompressedLength(inputSize: Long): Long = {
inputSize
}

/**
* Whether this compressor support zero-copy during compression.
* Zero-copy means that the compressor support receiving memory address (pointer)
* and returning result in memory address (pointer).
* Currently not all java version of the compressors support this feature.
*
* @return true if it supports, otherwise return false
*/
override def supportUnsafe(): Boolean = {
false
}
}

class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
private val tableName = "load_test_with_compressor"
private var executorService: ExecutorService = _
Expand Down Expand Up @@ -161,7 +269,7 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
loadData()

CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "ZSTD")
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
}
Expand Down Expand Up @@ -213,7 +321,7 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
val exception = intercept[UnsupportedOperationException] {
loadData()
}
assert(exception.getMessage.contains("Invalid compressor type provided"))
assert(exception.getMessage.contains("fake compressor is not supported"))
}

test("test compaction with unsupported compressor") {
Expand All @@ -225,7 +333,7 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
val exception = intercept[UnsupportedOperationException] {
sql(s"ALTER TABLE $tableName COMPACT 'major'")
}
assert(exception.getMessage.contains("Invalid compressor type provided"))
assert(exception.getMessage.contains("fake compressor is not supported"))
}

private def generateAllDataTypeDF(lineNum: Int) = {
Expand Down Expand Up @@ -309,7 +417,7 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
val carbonTable = CarbonEnv.getCarbonTable(Option("default"), tableName)(sqlContext.sparkSession)
val tableColumnCompressor = carbonTable.getTableInfo.getFactTable.getTableProperties.get(CarbonCommonConstants.COMPRESSOR)
assert("zstd".equalsIgnoreCase(tableColumnCompressor))
assertResult("zstd")(tableColumnCompressor)
}

test("test creating table with unsupported compressor") {
Expand All @@ -323,6 +431,70 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
assert(exception.getMessage.contains("fakecompressor compressor is not supported"))
}

test("test register compressor") {
// register successfully
var compressorName = "org.apache.carbondata.integration.spark.testsuite.dataload.CustomizeCompressor"
CompressorFactory.getInstance().registerColumnCompressor(compressorName)

// cannot register duplicated customized compressor
var rtn = CompressorFactory.getInstance().registerColumnCompressor(compressorName)
assertResult(false)(rtn)

// cannot register compressor with duplicated short name
compressorName = "org.apache.carbondata.core.datastore.compression.ZstdCompressor"
rtn = CompressorFactory.getInstance().registerColumnCompressor(compressorName)
assertResult(false)(rtn)

// cannot register compressor with reflection error
compressorName = "some.unknow.fakecompressor"
val exception = intercept[RuntimeException] {
CompressorFactory.getInstance().registerColumnCompressor(compressorName)
}
assert(exception.getMessage.contains(s"java.lang.ClassNotFoundException: $compressorName"))
}

test("test load data with customize compressor") {
// register customized compressor
val compressorName = "org.apache.carbondata.integration.spark.testsuite.dataload.CustomizeCompressor"
CompressorFactory.getInstance().registerColumnCompressor(compressorName)
// set customized compressor as default
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, compressorName)
createTable()
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))

// set short name of customized compressor as default
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR,
CompressorFactory.getInstance().getCompressor(compressorName).getName)
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
}

test("test create table with customize compressor") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
// register customized compressor
val compressorName = "org.apache.carbondata.integration.spark.testsuite.dataload.CustomizeCompressor"
CompressorFactory.getInstance().registerColumnCompressor(compressorName)
// create table with customize compressor
createTable(columnCompressor = compressorName)
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
val carbonTable = CarbonEnv.getCarbonTable(Option("default"), tableName)(sqlContext.sparkSession)
val tableColumnCompressor = carbonTable.getTableInfo.getFactTable.getTableProperties.get(CarbonCommonConstants.COMPRESSOR)
assertResult(compressorName.toLowerCase())(tableColumnCompressor)

sql(s"DROP TABLE IF EXISTS $tableName")
// use the short name of the registered compressor
val shortCompressorName = "Customize"
// create table with customize compressor
createTable(columnCompressor = shortCompressorName)
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
val carbonTable2 = CarbonEnv.getCarbonTable(Option("default"), tableName)(sqlContext.sparkSession)
val tableColumnCompressor2 = carbonTable2.getTableInfo.getFactTable.getTableProperties.get(CarbonCommonConstants.COMPRESSOR)
assertResult(shortCompressorName.toLowerCase())(tableColumnCompressor2)
}

private def generateAllDataTypeFiles(lineNum: Int, csvDir: String,
saveMode: SaveMode = SaveMode.Overwrite): Unit = {
val tsFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.HashMap;
Expand Down Expand Up @@ -65,7 +64,6 @@
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
import org.apache.carbondata.streaming.CarbonStreamInputFormat;
import org.apache.carbondata.streaming.CarbonStreamUtils;
import org.apache.carbondata.streaming.StreamBlockletReader;

import org.apache.hadoop.conf.Configuration;
Expand Down

0 comments on commit fcb9abd

Please sign in to comment.