Skip to content

Commit

Permalink
Merge 935f8bf into d9f1a81
Browse files Browse the repository at this point in the history
  • Loading branch information
shardul-cr7 committed Dec 10, 2018
2 parents d9f1a81 + 935f8bf commit ed8e985
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 21 deletions.
Expand Up @@ -123,4 +123,5 @@ public long rawCompress(long inputAddress, int inputSize, long outputAddress) th
return false;
}

@Override public boolean supportUnsafe() { return false; }
}
Expand Up @@ -36,7 +36,8 @@ public class CompressorFactory {

public enum NativeSupportedCompressor {
SNAPPY("snappy", SnappyCompressor.class),
ZSTD("zstd", ZstdCompressor.class);
ZSTD("zstd", ZstdCompressor.class),
GZIP("gzip", GzipCompressor.class);

private String name;
private Class<Compressor> compressorClass;
Expand Down
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.core.datastore.compression;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;

import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;

/**
* Codec Class for performing Gzip Compression
*/
public class GzipCompressor extends AbstractCompressor {

@Override public String getName() {
return "gzip";
}

/**
* This method takes the Byte Array data and Compresses in gzip format
*
* @param data Data Byte Array passed for compression
* @return Compressed Byte Array
*/
private byte[] compressData(byte[] data) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try {
GzipCompressorOutputStream gzipCompressorOutputStream =
new GzipCompressorOutputStream(byteArrayOutputStream);
try {
/**
* Below api will write bytes from specified byte array to the gzipCompressorOutputStream
* The output stream will compress the given byte array.
*/
gzipCompressorOutputStream.write(data);
} catch (IOException e) {
throw new RuntimeException("Error during Compression writing step ", e);
} finally {
gzipCompressorOutputStream.close();
}
} catch (IOException e) {
throw new RuntimeException("Error during Compression step ", e);
}
return byteArrayOutputStream.toByteArray();
}

/**
* This method takes the Byte Array data and Decompresses in gzip format
*
* @param data Data Byte Array for Compression
* @param offset Start value of Data Byte Array
* @param length Size of Byte Array
* @return
*/
private byte[] decompressData(byte[] data, int offset, int length) {
ByteArrayInputStream byteArrayOutputStream = new ByteArrayInputStream(data, offset, length);
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
try {
GzipCompressorInputStream gzipCompressorInputStream =
new GzipCompressorInputStream(byteArrayOutputStream);
byte[] buffer = new byte[1024];
int len;
/**
* Reads the next byte of the data from the input stream and stores them into buffer
* Data is then read from the buffer and put into byteOutputStream from a offset.
*/
while ((len = gzipCompressorInputStream.read(buffer)) != -1) {
byteOutputStream.write(buffer, 0, len);
}
} catch (IOException e) {
throw new RuntimeException("Error during Decompression step ",e);
}
return byteOutputStream.toByteArray();
}

@Override public byte[] compressByte(byte[] unCompInput) {
return compressData(unCompInput);
}

@Override public byte[] compressByte(byte[] unCompInput, int byteSize) {
return compressData(unCompInput);
}

@Override public byte[] unCompressByte(byte[] compInput) {
return decompressData(compInput, 0, compInput.length);
}

@Override public byte[] unCompressByte(byte[] compInput, int offset, int length) {
return decompressData(compInput, offset, length);
}

@Override public long rawUncompress(byte[] input, byte[] output) {
//gzip api doesnt have rawUncompress yet.
throw new RuntimeException("Not implemented rawUcompress for gzip yet");
}

@Override public long maxCompressedLength(long inputSize) {
// Check if input size is lower than the max possible size
if (inputSize < Integer.MAX_VALUE) {
return inputSize;
} else {
throw new RuntimeException("compress input oversize for gzip");
}
}

@Override public int unCompressedLength(byte[] data, int offset, int length) {
//gzip api doesnt have UncompressedLength
throw new RuntimeException("Unsupported operation Exception");
}

@Override public int rawUncompress(byte[] data, int offset, int length, byte[] output) {
//gzip api doesnt have rawUncompress yet.
throw new RuntimeException("Not implemented rawUcompress for gzip yet");
}
}
Expand Up @@ -70,11 +70,6 @@ public long maxCompressedLength(long inputSize) {
* currently java version of zstd does not support this feature.
* It may support it in upcoming release 1.3.5-3, then we can optimize this accordingly.
*/
@Override
public boolean supportUnsafe() {
return false;
}

@Override public int unCompressedLength(byte[] data, int offset, int length) {
throw new RuntimeException("Unsupported operation Exception");
}
Expand Down
Expand Up @@ -168,6 +168,7 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
private val tableName = "load_test_with_compressor"
private var executorService: ExecutorService = _
private val csvDataDir = s"$integrationPath/spark2/target/csv_load_compression"
private val compressors = Array("snappy","zstd","gzip")

override protected def beforeAll(): Unit = {
executorService = Executors.newFixedThreadPool(3)
Expand Down Expand Up @@ -252,50 +253,94 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
""".stripMargin)
}

test("test data loading with snappy compressor and offheap") {
test("test data loading with different compressors and offheap") {
for(comp <- compressors){
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, comp)
createTable()
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
}
}

test("test data loading with different compressors and onheap") {
for(comp <- compressors){
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, comp)
createTable()
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
}
}

test("test current zstd compressor on legacy store with snappy") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
createTable()
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
}

test("test data loading with zstd compressor and offheap") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
}

test("test current gzip compressor on legacy store with snappy") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
createTable()
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))

CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "gzip")
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
}

test("test data loading with zstd compressor and onheap") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
test("test current snappy compressor on legacy store with zstd") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
createTable()
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
}

test("test current zstd compressor on legacy store with snappy") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
}

test("test current snappy compressor on legacy store with gzip") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "gzip")
createTable()
loadData()

CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "ZSTD")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
}

test("test current snappy compressor on legacy store with zstd") {
test("test current gzip compressor on legacy store with zstd") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
createTable()
loadData()

CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "gzip")
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
}

test("test current zstd compressor on legacy store with gzip") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "gzip")
createTable()
loadData()

CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
}
Expand All @@ -311,7 +356,7 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
loadData()

CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "gzip")
loadData()

CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
Expand Down Expand Up @@ -416,11 +461,17 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, if (Random.nextBoolean()) "snappy" else "zstd")
}

CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "gzip")
future = compactAsync()
while (!future.isDone) {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, if (Random.nextBoolean()) "snappy" else "gzip")
}

checkAnswer(sql(s"SELECT COUNT(*) FROM $tableName"), Seq(Row(lineNum * 2)))
checkAnswer(sql(s"SELECT stringDictField, stringSortField FROM $tableName WHERE stringDictField='stringDict1'"), Seq(Row("stringDict1", "stringSort1"), Row("stringDict1", "stringSort1")))
}

test("test creating table with specified compressor") {
test("test creating table with specified zstd compressor") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
// the system configuration for compressor is snappy
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
Expand All @@ -433,6 +484,19 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
assertResult("zstd")(tableColumnCompressor)
}

test("test creating table with specified gzip compressor") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
// the system configuration for compressor is snappy
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
// create table with gzip as compressor
createTable(columnCompressor = "gzip")
loadData()
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
val carbonTable = CarbonEnv.getCarbonTable(Option("default"), tableName)(sqlContext.sparkSession)
val tableColumnCompressor = carbonTable.getTableInfo.getFactTable.getTableProperties.get(CarbonCommonConstants.COMPRESSOR)
assert("gzip".equalsIgnoreCase(tableColumnCompressor))
}

test("test creating table with unsupported compressor") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
// the system configuration for compressor is snappy
Expand Down
Expand Up @@ -137,6 +137,26 @@ class TestLoadWithSortTempCompressed extends QueryTest
originOffHeapStatus)
}

test("test data load for simple table with sort temp compressed with gzip" +
" and off-heap sort enabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"gzip")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
testSimpleTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
originOffHeapStatus)
}

test("test data load for simple table with sort temp compressed with gzip" +
" and off-heap sort disabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"gzip")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
testSimpleTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
originOffHeapStatus)
}

private def testComplexTable(): Unit = {
// note: following tests are copied from `TestComplexTypeQuery`
sql(
Expand Down

0 comments on commit ed8e985

Please sign in to comment.