Skip to content

Commit

Permalink
[CARBONDATA-2553] support ZSTD compression for sort temp file
Browse files Browse the repository at this point in the history
This closes #2350
  • Loading branch information
kevinjmh authored and jackylk committed Jun 18, 2018
1 parent 5593d16 commit ece0672
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 6 deletions.
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
<artifactId>snappy-java</artifactId>
<version>${snappy.version}</version>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>1.3.2-2</version>
</dependency>
<dependency>
<groupId>org.jmockit</groupId>
<artifactId>jmockit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1378,7 +1378,7 @@ public final class CarbonCommonConstants {
public static final String CARBON_SORT_TEMP_COMPRESSOR = "carbon.sort.temp.compressor";

/**
* The optional values are 'SNAPPY','GZIP','BZIP2','LZ4'.
* The optional values are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD'.
* By default, empty means that Carbondata will not compress the sort temp files.
*/
public static final String CARBON_SORT_TEMP_COMPRESSOR_DEFAULT = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.util.CarbonUtil;

import com.github.luben.zstd.ZstdInputStream;
import com.github.luben.zstd.ZstdOutputStream;
import net.jpountz.lz4.LZ4BlockInputStream;
import net.jpountz.lz4.LZ4BlockOutputStream;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
Expand Down Expand Up @@ -290,6 +292,8 @@ public List<CarbonFile> listFiles(Boolean recurssive) {
inputStream = new SnappyInputStream(new FileInputStream(path));
} else if ("LZ4".equalsIgnoreCase(compressor)) {
inputStream = new LZ4BlockInputStream(new FileInputStream(path));
} else if ("ZSTD".equalsIgnoreCase(compressor)) {
inputStream = new ZstdInputStream(new FileInputStream(path));
} else {
throw new IOException("Unsupported compressor: " + compressor);
}
Expand Down Expand Up @@ -368,6 +372,10 @@ public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fi
outputStream = new SnappyOutputStream(new FileOutputStream(path));
} else if ("LZ4".equalsIgnoreCase(compressor)) {
outputStream = new LZ4BlockOutputStream(new FileOutputStream(path));
} else if ("ZSTD".equalsIgnoreCase(compressor)) {
// compression level 1 is cost-effective for sort temp file
// which is not used for storage
outputStream = new ZstdOutputStream(new FileOutputStream(path), 1);
} else {
throw new IOException("Unsupported compressor: " + compressor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1290,11 +1290,11 @@ public String getSortTempCompressor() {
String compressor = getProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR_DEFAULT).toUpperCase();
if (compressor.isEmpty() || "SNAPPY".equals(compressor) || "GZIP".equals(compressor)
|| "BZIP2".equals(compressor) || "LZ4".equals(compressor)) {
|| "BZIP2".equals(compressor) || "LZ4".equals(compressor) || "ZSTD".equals(compressor)) {
return compressor;
} else {
LOGGER.warn("The ".concat(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR)
.concat(" configuration value is invalid. Only snappy,gzip,bip2,lz4 and")
.concat(" configuration value is invalid. Only snappy, gzip, bip2, lz4, zstd and")
.concat(" empty are allowed. It will not compress the sort temp files by default"));
return CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR_DEFAULT;
}
Expand Down
2 changes: 1 addition & 1 deletion docs/useful-tips-on-carbondata.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@
| carbon.detail.batch.size | spark/carbonlib/carbon.properties | Data loading | The buffer size to store records, returned from the block scan. | In limit scenario this parameter is very important. For example your query limit is 1000. But if we set this value to 3000 that means we get 3000 records from scan but spark will only take 1000 rows. So the 2000 remaining are useless. In one Finance test case after we set it to 100, in the limit 1000 scenario the performance increase about 2 times in comparison to if we set this value to 12000. |
| carbon.use.local.dir | spark/carbonlib/carbon.properties | Data loading | Whether use YARN local directories for multi-table load disk load balance | If this is set it to true CarbonData will use YARN local directories for multi-table load disk load balance, that will improve the data load performance. |
| carbon.use.multiple.temp.dir | spark/carbonlib/carbon.properties | Data loading | Whether to use multiple YARN local directories during table data loading for disk load balance | After enabling 'carbon.use.local.dir', if this is set to true, CarbonData will use all YARN local directories during data load for disk load balance, that will improve the data load performance. Please enable this property when you encounter disk hotspot problem during data loading. |
| carbon.sort.temp.compressor | spark/carbonlib/carbon.properties | Data loading | Specify the name of compressor to compress the intermediate sort temporary files during sort procedure in data loading. | The optional values are 'SNAPPY','GZIP','BZIP2','LZ4' and empty. By default, empty means that Carbondata will not compress the sort temp files. This parameter will be useful if you encounter disk bottleneck. |
| carbon.sort.temp.compressor | spark/carbonlib/carbon.properties | Data loading | Specify the name of compressor to compress the intermediate sort temporary files during sort procedure in data loading. | The optional values are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD' and empty. By default, empty means that Carbondata will not compress the sort temp files. This parameter will be useful if you encounter disk bottleneck. |
| carbon.load.skewedDataOptimization.enabled | spark/carbonlib/carbon.properties | Data loading | Whether to enable size based block allocation strategy for data loading. | When loading, carbondata will use file size based block allocation strategy for task distribution. It will make sure that all the executors process the same size of data -- It's useful if the size of your input data files varies widely, say 1MB~1GB. |
| carbon.load.min.size.enabled | spark/carbonlib/carbon.properties | Data loading | Whether to enable node minumun input data size allocation strategy for data loading.| When loading, carbondata will use node minumun input data size allocation strategy for task distribution. It will make sure the node load the minimum amount of data -- It's useful if the size of your input data files very small, say 1MB~256MB,Avoid generating a large number of small files. |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,8 @@ class TestLoadWithSortTempCompressed extends QueryTest


override protected def beforeAll(): Unit = {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"SNAPPY")
}

override def afterAll(): Unit = {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
Expand Down Expand Up @@ -84,6 +83,8 @@ class TestLoadWithSortTempCompressed extends QueryTest

test("test data load for simple table with sort temp compressed with snappy" +
" and off-heap sort enabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"SNAPPY")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
testSimpleTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
Expand All @@ -92,6 +93,28 @@ class TestLoadWithSortTempCompressed extends QueryTest

test("test data load for simple table with sort temp compressed with snappy" +
" and off-heap sort disabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"SNAPPY")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
testSimpleTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
originOffHeapStatus)
}

test("test data load for simple table with sort temp compressed with zstd" +
" and off-heap sort enabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"ZSTD")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
testSimpleTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
originOffHeapStatus)
}

test("test data load for simple table with sort temp compressed with zstd" +
" and off-heap sort disabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"ZSTD")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
testSimpleTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
Expand Down Expand Up @@ -138,6 +161,8 @@ class TestLoadWithSortTempCompressed extends QueryTest

test("test data load for complex table with sort temp compressed with snappy" +
" and off-heap sort enabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"SNAPPY")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
testComplexTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
Expand All @@ -146,6 +171,28 @@ class TestLoadWithSortTempCompressed extends QueryTest

test("test data load for complex table with sort temp compressed with snappy" +
" and off-heap sort disabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"SNAPPY")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
testComplexTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
originOffHeapStatus)
}

test("test data load for complex table with sort temp compressed with zstd" +
" and off-heap sort enabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"ZSTD")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
testComplexTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
originOffHeapStatus)
}

test("test data load for complex table with sort temp compressed with zstd" +
" and off-heap sort disabled") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
"ZSTD")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
testComplexTable()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
Expand Down

0 comments on commit ece0672

Please sign in to comment.