New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CARBONDATA-3005]Support Gzip as column compressor #2847
Conversation
Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9235/ |
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/974/ |
Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1187/ |
} | ||
|
||
@Override public long rawUncompress(byte[] input, byte[] output) throws IOException { | ||
//gzip api doesnt have rawCompress yet. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it is so, just throw exception, otherwise JVM may crash if you pass the illegal address/length
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
e.printStackTrace(); | ||
} | ||
|
||
return bt.toByteArray(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why bt
is still open?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ByteArrayOutputStream.close() does nothing. It's implementation in java is like this:
public void close() throws IOException {
}
I can close it but I'll have to copy the stream to byte Array and return that byte array which can be a costly operation.
try { | ||
gzos.write(data); | ||
} catch (IOException e) { | ||
e.printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please optimize the logging!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok will do that!
gzos.close(); | ||
} | ||
} catch (IOException e) { | ||
e.printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please optimize the logging!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
|
||
} catch (IOException e) { | ||
e.printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please optimize the logging!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
e.printStackTrace(); | ||
} | ||
|
||
return bot.toByteArray(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bot
not closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to ByteArrayOutputStream.close() reason mentioned above.
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/994/ |
Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1207/ |
Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9260/ |
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1633/ |
Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9893/ |
Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1844/ |
|
||
ByteArrayInputStream byteArrayOutputStream = new ByteArrayInputStream(data); | ||
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove empty line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
} | ||
|
||
/* | ||
* Method called for compressing the data and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change the comment as starndard doc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1634/ |
Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1845/ |
Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9894/ |
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1641/ |
Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9901/ |
Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1852/ |
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1645/ |
Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9905/ |
Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1856/ |
*/ | ||
public class GzipCompressor extends AbstractCompressor { | ||
|
||
public GzipCompressor() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this empty constructor is required ??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
@@ -35,8 +35,8 @@ | |||
private final Map<String, Compressor> allSupportedCompressors = new HashMap<>(); | |||
|
|||
public enum NativeSupportedCompressor { | |||
SNAPPY("snappy", SnappyCompressor.class), | |||
ZSTD("zstd", ZstdCompressor.class); | |||
SNAPPY("snappy", SnappyCompressor.class), ZSTD("zstd", ZstdCompressor.class), GZIP("gzip", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move each compressor to new line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
} | ||
|
||
@Override public boolean supportUnsafe() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move this default implantation to AbstractCompressor and override only in SnappyCompressor class from other classes remove this implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -168,6 +168,7 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with | |||
private val tableName = "load_test_with_compressor" | |||
private var executorService: ExecutorService = _ | |||
private val csvDataDir = s"$integrationPath/spark2/target/csv_load_compression" | |||
private val compressors = Array("snappy","zstd","gzip") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't remove any test case add new testcase for Zstd
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No test cases were removed. Just changed the test case name of "test with snappy and offheap" was changed to "test different compressors and offheap".
} | ||
|
||
@Override public long maxCompressedLength(long inputSize) { | ||
if (inputSize < Integer.MAX_VALUE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add some comments for this peace of code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1674/ |
*/ | ||
gzipCompressorOutputStream.write(data); | ||
} catch (IOException e) { | ||
throw new RuntimeException("Error during Compression step " + e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't skip the actual exception. Add original exception also as the cause to RunTimeException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok added the actual exception.
Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1887/ |
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1678/ |
Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1890/ |
Build Success with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9938/ |
try { | ||
GzipCompressorInputStream gzipCompressorInputStream = | ||
new GzipCompressorInputStream(byteArrayOutputStream); | ||
byte[] buffer = new byte[1024]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of fixed 1024, can you observe what is the blocksize (bytes size) gzip operates and use the same value ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I can fixed it with double of the data length, so it minimizes the read. I have added that.
* @return Compressed Byte Array | ||
*/ | ||
private byte[] compressData(byte[] data) { | ||
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ByteArrayOutputStream initializes with 32 and copies the data to new byte[] on expansion. Can you use a better initial size to limit the number of copies during expansion. Snappy has a utility (maxCompressedLength) to calculate the same, you check if any gzip libs has similar method. If not we an use based a test with max possible compression ratio.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the observations I have initialized the byteArrayOutputStream with size of half of byte buffer, So it reduces the number of resizing of the stream.
test("test data loading with snappy compressor and offheap") { | ||
test("test data loading with different compressors and offheap") { | ||
for(comp <- compressors){ | ||
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have UT for enable.unsafe.in.query.processing ture and false ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By default for gzip/zstd, it's false. So UT for this scenario is not required.
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1686/ |
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1687/ |
Build Success with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9947/ |
Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1898/ |
LGTM |
1 similar comment
LGTM |
This PR is to add a new compressor "Gzip" and enhance the compressing capabilities offered by CarbonData. User can now use gzip as the compressor for loading the data. Gzip can be set at System Properties level or also for particular table. This closes #2847
This PR is to add a new compressor "Gzip" and enhance the compressing capabilities offered by CarbonData. User can now use gzip as the compressor for loading the data. Gzip can be set at System Properties level or also for particular table. This closes apache#2847
This PR is to add a new compressor "Gzip" and enhance the compressing capabilities offered by CarbonData. User can now use gzip as the compressor for loading the data. Gzip can be set at System Properties level or also for particular table.
Gzip compressed file size is less than that of snappy but takes more time.
Data generated by tpch-dbgen(lineitem)
Load Performance Comparisons (Compression)
Test Case 1
File Size 3.9G
Records ~30M
Test Case 2
File Size 7.8G
Records ~60M
Query Performance (Decompression)
Test Case 1
Test Case 2
Be sure to do all of the following checklist to help us incorporate
your contribution quickly and easily:
Any interfaces changed?
Any backward compatibility impacted?
Document update required?
Testing done
added some testcases
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.