From 30267a9b22ac7bbfbf66c3fd6311693b860f1db8 Mon Sep 17 00:00:00 2001 From: GreenMonster Date: Tue, 11 Feb 2020 18:08:59 +0800 Subject: [PATCH 1/2] [CARBONDATA-3692] Support NoneCompression during loading data. Why is this PR needed? In some cases, the data need to be uncompressed after loading into Carbondata file. In the current version, the project do not support loading data without compression. What changes were proposed in this PR? Provide a new Compressor as NoneCompressor implement the AbstractCompressor. This compressor can be set by calling CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR,"none"); Does this PR introduce any user interface change? Yes Is any new testcase added? Yes --- .../compression/CompressorFactory.java | 1 + .../datastore/compression/NoneCompressor.java | 51 +++++++++++ .../TestLoadDataWithCompression.scala | 88 ++++++++++++++++++- 3 files changed, 139 insertions(+), 1 deletion(-) create mode 100644 core/src/main/java/org/apache/carbondata/core/datastore/compression/NoneCompressor.java diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java index e695bdab974..c1433551961 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java @@ -35,6 +35,7 @@ public class CompressorFactory { private final Map allSupportedCompressors = new HashMap<>(); public enum NativeSupportedCompressor { + NONE("none",NoneCompressor.class), SNAPPY("snappy", SnappyCompressor.class), ZSTD("zstd", ZstdCompressor.class), GZIP("gzip", GzipCompressor.class); diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/NoneCompressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/NoneCompressor.java new file mode 100644 index 00000000000..496d695c035 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/NoneCompressor.java @@ -0,0 +1,51 @@ +package org.apache.carbondata.core.datastore.compression; + +import java.io.IOException; + +public class NoneCompressor extends AbstractCompressor { + + @Override + public String getName() { + return "none"; + } + + @Override + public byte[] compressByte(byte[] unCompInput) { + return unCompInput; + } + + @Override + public byte[] compressByte(byte[] unCompInput, int byteSize) { + return unCompInput; + } + + @Override + public byte[] unCompressByte(byte[] compInput) { + return compInput; + } + + @Override + public byte[] unCompressByte(byte[] compInput, int offset, int length) { + return compInput; + } + + @Override + public long rawUncompress(byte[] input, byte[] output) throws IOException { + throw new RuntimeException("Not implemented rawCompress for noneCompressor yet"); + } + + @Override + public long maxCompressedLength(long inputSize) { + return inputSize; + } + + @Override + public int unCompressedLength(byte[] data, int offset, int length) { + throw new RuntimeException("Unsupported operation Exception"); + } + + @Override + public int rawUncompress(byte[] data, int offset, int length, byte[] output) { + throw new RuntimeException("Not implemented rawCompress for noneCompressor yet"); + } +} diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala index 8236635de6d..56a26e00a28 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala @@ -168,7 +168,7 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with private val tableName = "load_test_with_compressor" private var executorService: ExecutorService = _ private val csvDataDir = s"$integrationPath/spark2/target/csv_load_compression" - private val compressors = Array("snappy","zstd","gzip") + private val compressors = Array("snappy","zstd","gzip","none") override protected def beforeAll(): Unit = { executorService = Executors.newFixedThreadPool(3) @@ -272,6 +272,79 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with } } + test("test current none compressor on legacy store with snappy") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + createTable() + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "none") + loadData() + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16))) + } + + test("test current none compressor on legacy store with zstd") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + createTable() + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "none") + loadData() + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16))) + } + + test("test current none compressor on legacy store with gzip") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "gzip") + createTable() + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "none") + loadData() + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16))) + } + + test("test current snappy compressor on legacy store with none") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "none") + createTable() + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + loadData() + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16))) + } + + test("test current gzip compressor on legacy store with none") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "none") + createTable() + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "gzip") + loadData() + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16))) + } + + test("test current zstd compressor on legacy store with none") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "none") + createTable() + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + loadData() + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16))) + } + + test("test current zstd compressor on legacy store with snappy") { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") @@ -470,6 +543,19 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with checkAnswer(sql(s"SELECT stringDictField, stringSortField FROM $tableName WHERE stringDictField='stringDict1'"), Seq(Row("stringDict1", "stringSort1"), Row("stringDict1", "stringSort1"))) } + test("test creating table with specified none compressor") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + // the system configuration for compressor is snappy + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + // create table with none as compressor + createTable(columnCompressor = "none") + loadData() + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8))) + val carbonTable = CarbonEnv.getCarbonTable(Option("default"), tableName)(sqlContext.sparkSession) + val tableColumnCompressor = carbonTable.getTableInfo.getFactTable.getTableProperties.get(CarbonCommonConstants.COMPRESSOR) + assertResult("none")(tableColumnCompressor) + } + test("test creating table with specified zstd compressor") { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") // the system configuration for compressor is snappy From 6bd80f96e94061d591f9deaaddcd31468c5d0c0a Mon Sep 17 00:00:00 2001 From: GreenMonster Date: Sun, 16 Feb 2020 00:23:44 +0800 Subject: [PATCH 2/2] [CARBONDATA-3692] Support NoneCompression during loading data. Why is this PR needed? In some cases, the data need to be uncompressed after loading into Carbondata file. In the current version, the project do not support loading data without compression. What changes were proposed in this PR? Provide a new Compressor as NoneCompressor implement the AbstractCompressor. This compressor can be set by calling CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR,"none"); Does this PR introduce any user interface change? Yes Is any new testcase added? Yes --- .../compression/CompressorFactory.java | 2 +- .../datastore/compression/NoneCompressor.java | 22 +++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java index c1433551961..6932b34953b 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java @@ -35,7 +35,7 @@ public class CompressorFactory { private final Map allSupportedCompressors = new HashMap<>(); public enum NativeSupportedCompressor { - NONE("none",NoneCompressor.class), + NONE("none", NoneCompressor.class), SNAPPY("snappy", SnappyCompressor.class), ZSTD("zstd", ZstdCompressor.class), GZIP("gzip", GzipCompressor.class); diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/NoneCompressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/NoneCompressor.java index 496d695c035..b45a09fb8b1 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/NoneCompressor.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/NoneCompressor.java @@ -1,7 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.carbondata.core.datastore.compression; import java.io.IOException; +/** + * This compressor actually will not compress or decompress anything. + * It is used for speed up the data loading by skip the compression. + */ + public class NoneCompressor extends AbstractCompressor { @Override