Skip to content

Commit

Permalink
Review comments handled
Browse files Browse the repository at this point in the history
  • Loading branch information
shardul-cr7 committed Dec 10, 2018
1 parent 65c643b commit fbc7e9d
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 133 deletions.
Expand Up @@ -123,4 +123,5 @@ public long rawCompress(long inputAddress, int inputSize, long outputAddress) th
return false;
}

@Override public boolean supportUnsafe() { return false; }
}
Expand Up @@ -20,73 +20,76 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.DoubleBuffer;
import java.nio.FloatBuffer;
import java.nio.IntBuffer;
import java.nio.LongBuffer;
import java.nio.ShortBuffer;

import org.apache.carbondata.core.util.ByteUtil;

import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;

public class GzipCompressor implements Compressor {

public GzipCompressor() {
}
/**
* Codec Class for performing Gzip Compression
*/
public class GzipCompressor extends AbstractCompressor {

@Override public String getName() {
return "gzip";
}

/*
* Method called for compressing the data and
* return a byte array
/**
* This method takes the Byte Array data and Compresses in gzip format
*
* @param data Data Byte Array passed for compression
* @return Compressed Byte Array
*/
private byte[] compressData(byte[] data) {

ByteArrayOutputStream bt = new ByteArrayOutputStream();
int initialSize = (data.length / 2) == 0 ? data.length : data.length / 2;
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(initialSize);
try {
GzipCompressorOutputStream gzos = new GzipCompressorOutputStream(bt);
GzipCompressorOutputStream gzipCompressorOutputStream =
new GzipCompressorOutputStream(byteArrayOutputStream);
try {
gzos.write(data);
/**
* Below api will write bytes from specified byte array to the gzipCompressorOutputStream
* The output stream will compress the given byte array.
*/
gzipCompressorOutputStream.write(data);
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("Error during Compression writing step ", e);
} finally {
gzos.close();
gzipCompressorOutputStream.close();
}
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("Error during Compression step ", e);
}

return bt.toByteArray();
return byteArrayOutputStream.toByteArray();
}

/*
* Method called for decompressing the data and
* return a byte array
/**
* This method takes the Byte Array data and Decompresses in gzip format
*
* @param data Data Byte Array for Compression
* @param offset Start value of Data Byte Array
* @param length Size of Byte Array
* @return
*/
private byte[] decompressData(byte[] data) {

ByteArrayInputStream bt = new ByteArrayInputStream(data);
ByteArrayOutputStream bot = new ByteArrayOutputStream();

private byte[] decompressData(byte[] data, int offset, int length) {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data, offset, length);
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
try {
GzipCompressorInputStream gzis = new GzipCompressorInputStream(bt);
byte[] buffer = new byte[1024];
GzipCompressorInputStream gzipCompressorInputStream =
new GzipCompressorInputStream(byteArrayInputStream);
int initialSize = (data.length * 2) < Integer.MAX_VALUE ? (data.length * 2) : data.length;
byte[] buffer = new byte[initialSize];
int len;

while ((len = gzis.read(buffer)) != -1) {
bot.write(buffer, 0, len);
/**
* Reads the next byte of the data from the input stream and stores them into buffer
* Data is then read from the buffer and put into byteOutputStream from a offset.
*/
while ((len = gzipCompressorInputStream.read(buffer)) != -1) {
byteOutputStream.write(buffer, 0, len);
}

} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("Error during Decompression step ", e);
}

return bot.toByteArray();
return byteOutputStream.toByteArray();
}

@Override public byte[] compressByte(byte[] unCompInput) {
Expand All @@ -98,104 +101,34 @@ private byte[] decompressData(byte[] data) {
}

@Override public byte[] unCompressByte(byte[] compInput) {
return decompressData(compInput);
return decompressData(compInput, 0, compInput.length);
}

@Override public byte[] unCompressByte(byte[] compInput, int offset, int length) {
byte[] data = new byte[length];
System.arraycopy(compInput, offset, data, 0, length);
return decompressData(data);
return decompressData(compInput, offset, length);
}

@Override public byte[] compressShort(short[] unCompInput) {
ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_SHORT);
unCompBuffer.asShortBuffer().put(unCompInput);
return compressData(unCompBuffer.array());
}

@Override public short[] unCompressShort(byte[] compInput, int offset, int length) {
byte[] unCompArray = unCompressByte(compInput, offset, length);
ShortBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asShortBuffer();
short[] shorts = new short[unCompArray.length / ByteUtil.SIZEOF_SHORT];
unCompBuffer.get(shorts);
return shorts;
}

@Override public byte[] compressInt(int[] unCompInput) {
ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_INT);
unCompBuffer.asIntBuffer().put(unCompInput);
return compressData(unCompBuffer.array());
}

@Override public int[] unCompressInt(byte[] compInput, int offset, int length) {
byte[] unCompArray = unCompressByte(compInput, offset, length);
IntBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asIntBuffer();
int[] ints = new int[unCompArray.length / ByteUtil.SIZEOF_INT];
unCompBuffer.get(ints);
return ints;
}

@Override public byte[] compressLong(long[] unCompInput) {
ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_LONG);
unCompBuffer.asLongBuffer().put(unCompInput);
return compressData(unCompBuffer.array());
}

@Override public long[] unCompressLong(byte[] compInput, int offset, int length) {
byte[] unCompArray = unCompressByte(compInput, offset, length);
LongBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asLongBuffer();
long[] longs = new long[unCompArray.length / ByteUtil.SIZEOF_LONG];
unCompBuffer.get(longs);
return longs;
}

@Override public byte[] compressFloat(float[] unCompInput) {
ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_FLOAT);
unCompBuffer.asFloatBuffer().put(unCompInput);
return compressData(unCompBuffer.array());
}

@Override public float[] unCompressFloat(byte[] compInput, int offset, int length) {
byte[] unCompArray = unCompressByte(compInput, offset, length);
FloatBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asFloatBuffer();
float[] floats = new float[unCompArray.length / ByteUtil.SIZEOF_FLOAT];
unCompBuffer.get(floats);
return floats;
}

@Override public byte[] compressDouble(double[] unCompInput) {
ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_DOUBLE);
unCompBuffer.asDoubleBuffer().put(unCompInput);
return compressData(unCompBuffer.array());
}

@Override public double[] unCompressDouble(byte[] compInput, int offset, int length) {
byte[] unCompArray = unCompressByte(compInput, offset, length);
DoubleBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asDoubleBuffer();
double[] doubles = new double[unCompArray.length / ByteUtil.SIZEOF_DOUBLE];
unCompBuffer.get(doubles);
return doubles;
}

@Override public long rawCompress(long inputAddress, int inputSize, long outputAddress)
throws IOException {
throw new RuntimeException("Not implemented rawUncompress for gzip yet");
}

@Override public long rawUncompress(byte[] input, byte[] output) throws IOException {
//gzip api doesnt have rawCompress yet.
return 0;
@Override public long rawUncompress(byte[] input, byte[] output) {
//gzip api doesnt have rawUncompress yet.
throw new RuntimeException("Not implemented rawUcompress for gzip yet");
}

@Override public long maxCompressedLength(long inputSize) {
// Check if input size is lower than the max possible size
if (inputSize < Integer.MAX_VALUE) {
return inputSize;
} else {
throw new RuntimeException("compress input oversize for gzip");
}
}

@Override public boolean supportUnsafe() {
return false;
@Override public int unCompressedLength(byte[] data, int offset, int length) {
//gzip api doesnt have UncompressedLength
throw new RuntimeException("Unsupported operation Exception");
}

@Override public int rawUncompress(byte[] data, int offset, int length, byte[] output) {
//gzip api doesnt have rawUncompress yet.
throw new RuntimeException("Not implemented rawUcompress for gzip yet");
}
}
Expand Up @@ -70,11 +70,6 @@ public long maxCompressedLength(long inputSize) {
* currently java version of zstd does not support this feature.
* It may support it in upcoming release 1.3.5-3, then we can optimize this accordingly.
*/
@Override
public boolean supportUnsafe() {
return false;
}

@Override public int unCompressedLength(byte[] data, int offset, int length) {
throw new RuntimeException("Unsupported operation Exception");
}
Expand Down
Expand Up @@ -292,7 +292,6 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
loadData()

CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "ZSTD")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "gzip")
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
Expand All @@ -310,6 +309,42 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
}

test("test current snappy compressor on legacy store with gzip") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "gzip")
createTable()
loadData()

CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
}

test("test current gzip compressor on legacy store with zstd") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
createTable()
loadData()

CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "gzip")
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
}

test("test current zstd compressor on legacy store with gzip") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "gzip")
createTable()
loadData()

CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
}

test("test compaction with different compressor for each load") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
Expand Down Expand Up @@ -436,7 +471,7 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
checkAnswer(sql(s"SELECT stringDictField, stringSortField FROM $tableName WHERE stringDictField='stringDict1'"), Seq(Row("stringDict1", "stringSort1"), Row("stringDict1", "stringSort1")))
}

test("test creating table with specified compressor") {
test("test creating table with specified zstd compressor") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
// the system configuration for compressor is snappy
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
Expand All @@ -453,7 +488,7 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
// the system configuration for compressor is snappy
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
// create table with zstd as compressor
// create table with gzip as compressor
createTable(columnCompressor = "gzip")
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
Expand Down
Expand Up @@ -137,6 +137,26 @@ class TestLoadWithSortTempCompressed extends QueryTest
originOffHeapStatus)
}

test("test data load for simple table with sort temp compressed with gzip" +
" and off-heap sort enabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"gzip")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
testSimpleTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
originOffHeapStatus)
}

test("test data load for simple table with sort temp compressed with gzip" +
" and off-heap sort disabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"gzip")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
testSimpleTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
originOffHeapStatus)
}

private def testComplexTable(): Unit = {
// note: following tests are copied from `TestComplexTypeQuery`
sql(
Expand Down

0 comments on commit fbc7e9d

Please sign in to comment.