Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pure-java Air-Compressor instead of JNI based libraries #5390

Merged
merged 9 commits into from
Jun 4, 2020

Conversation

merlimat
Copy link
Contributor

Motivation

Right now we're using JNI based libraries to perform data compression. These libraries are do have an overhead in terms of size (7Mb out of 20Mb of Pulsar-Client lib) and are incurring the JNI overhead which is typically measurable when compressing many small payloads.

We can replaces compression libraries for LZ4, ZStd and Snappy with AirCompressor (https://github.com/airlift/aircompressor), which is a pure Java compression library used by Presto.

Microbenchmarks

Microbenchmark code is available at https://github.com/merlimat/compression-benchmark
The results are on-par with the JNI version in most cases.

Results:

Benchmark                      (provider)  (size)   Mode  Cnt     Score   Error   Units
CompressDecompress.compress           Zip     10b  thrpt        214.639          ops/ms
CompressDecompress.compress           Zip     1kb  thrpt         40.054          ops/ms
CompressDecompress.compress           Zip    10kb  thrpt          4.898          ops/ms
CompressDecompress.compress           Zip    64kb  thrpt          1.416          ops/ms
CompressDecompress.compress       JNI-LZ4     10b  thrpt       2307.323          ops/ms
CompressDecompress.compress       JNI-LZ4     1kb  thrpt        496.654          ops/ms
CompressDecompress.compress       JNI-LZ4    10kb  thrpt         81.646          ops/ms
CompressDecompress.compress       JNI-LZ4    64kb  thrpt         20.605          ops/ms
CompressDecompress.compress      JNI-ZStd     10b  thrpt       1635.891          ops/ms
CompressDecompress.compress      JNI-ZStd     1kb  thrpt         98.864          ops/ms
CompressDecompress.compress      JNI-ZStd    10kb  thrpt         21.072          ops/ms
CompressDecompress.compress      JNI-ZStd    64kb  thrpt          6.316          ops/ms
CompressDecompress.compress    JNI-Snappy     10b  thrpt       2198.521          ops/ms
CompressDecompress.compress    JNI-Snappy     1kb  thrpt        264.103          ops/ms
CompressDecompress.compress    JNI-Snappy    10kb  thrpt         27.095          ops/ms
CompressDecompress.compress    JNI-Snappy    64kb  thrpt          7.961          ops/ms
CompressDecompress.compress     AC-Snappy     10b  thrpt       7542.674          ops/ms
CompressDecompress.compress     AC-Snappy     1kb  thrpt        374.631          ops/ms
CompressDecompress.compress     AC-Snappy    10kb  thrpt         54.042          ops/ms
CompressDecompress.compress     AC-Snappy    64kb  thrpt         16.932          ops/ms
CompressDecompress.compress        AC-LZ4     10b  thrpt       9360.128          ops/ms
CompressDecompress.compress        AC-LZ4     1kb  thrpt        374.507          ops/ms
CompressDecompress.compress        AC-LZ4    10kb  thrpt         50.168          ops/ms
CompressDecompress.compress        AC-LZ4    64kb  thrpt         12.202          ops/ms
CompressDecompress.compress       AC-ZStd     10b  thrpt    3   641.855          ops/ms
CompressDecompress.compress       AC-ZStd     1kb  thrpt    3    40.996          ops/ms
CompressDecompress.compress       AC-ZStd    10kb  thrpt    3     9.626          ops/ms
CompressDecompress.compress       AC-ZStd    64kb  thrpt    3     2.979          ops/ms
CompressDecompress.decompress         Zip     10b  thrpt       1856.131          ops/ms
CompressDecompress.decompress         Zip     1kb  thrpt        164.930          ops/ms
CompressDecompress.decompress         Zip    10kb  thrpt         35.547          ops/ms
CompressDecompress.decompress         Zip    64kb  thrpt          7.846          ops/ms
CompressDecompress.decompress     JNI-LZ4     10b  thrpt       4127.293          ops/ms
CompressDecompress.decompress     JNI-LZ4     1kb  thrpt       2003.209          ops/ms
CompressDecompress.decompress     JNI-LZ4    10kb  thrpt        326.834          ops/ms
CompressDecompress.decompress     JNI-LZ4    64kb  thrpt         66.849          ops/ms
CompressDecompress.decompress    JNI-ZStd     10b  thrpt       1003.361          ops/ms
CompressDecompress.decompress    JNI-ZStd     1kb  thrpt        156.012          ops/ms
CompressDecompress.decompress    JNI-ZStd    10kb  thrpt         48.902          ops/ms
CompressDecompress.decompress    JNI-ZStd    64kb  thrpt         19.477          ops/ms
CompressDecompress.decompress  JNI-Snappy     10b  thrpt       3564.673          ops/ms
CompressDecompress.decompress  JNI-Snappy     1kb  thrpt        884.649          ops/ms
CompressDecompress.decompress  JNI-Snappy    10kb  thrpt        123.879          ops/ms
CompressDecompress.decompress  JNI-Snappy    64kb  thrpt         24.231          ops/ms
CompressDecompress.decompress   AC-Snappy     10b  thrpt       8733.916          ops/ms
CompressDecompress.decompress   AC-Snappy     1kb  thrpt        830.710          ops/ms
CompressDecompress.decompress   AC-Snappy    10kb  thrpt        118.719          ops/ms
CompressDecompress.decompress   AC-Snappy    64kb  thrpt         26.784          ops/ms
CompressDecompress.decompress      AC-LZ4     10b  thrpt       9422.680          ops/ms
CompressDecompress.decompress      AC-LZ4     1kb  thrpt       1746.522          ops/ms
CompressDecompress.decompress      AC-LZ4    10kb  thrpt        228.481          ops/ms
CompressDecompress.decompress      AC-LZ4    64kb  thrpt         56.751          ops/ms
CompressDecompress.decompress     AC-ZStd     10b  thrpt       6789.080          ops/ms
CompressDecompress.decompress     AC-ZStd     1kb  thrpt        194.514          ops/ms
CompressDecompress.decompress     AC-ZStd    10kb  thrpt         42.434          ops/ms
CompressDecompress.decompress     AC-ZStd    64kb  thrpt         15.292          ops/ms

https://docs.google.com/spreadsheets/d/18ntnyxiQY3VedYeywoum9JXV97f-G9RL7xD-T6th7tA/edit#gid=153785868

Compression

image

image

image

image

Decompression

image

image

image

image

@merlimat merlimat added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Oct 15, 2019
@merlimat merlimat added this to the 2.5.0 milestone Oct 15, 2019
@merlimat merlimat self-assigned this Oct 15, 2019
@aahmed-se
Copy link
Contributor

run cpp tests

@aahmed-se
Copy link
Contributor

run java8 tests

@aahmed-se
Copy link
Contributor

run integration tests

@aahmed-se
Copy link
Contributor

run java8 tests

@rdhabalia
Copy link
Contributor

@merlimat before merging this PR,
can you please do a compatibility test, and test it with different version of producer and consumer.
Recently we have seen failure when consumer tries to decode LZ4 compressed message. and we want to avoid any unknown compatibility issue.

net.jpountz.lz4.LZ4Exception: Malformed input at 367
	at net.jpountz.lz4.LZ4JavaUnsafeFastDecompressor.decompress(LZ4JavaUnsafeFastDecompressor.java:172)
	at org.apache.pulsar.common.compression.CompressionCodecLZ4.decode(CompressionCodecLZ4.java:74)
	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalPeekNthMessage(PersistentTopicsBase.java:1118)
	at org.apache.pulsar.broker.admin.v1.PersistentTopics.peekNthMessage(PersistentTopics.java:435)

@merlimat
Copy link
Contributor Author

@rdhabalia There's a test to double-check that the format is the same as the current compression codecs: https://github.com/apache/pulsar/pull/5390/files#diff-5d40386eaa90a0ce27694830c4fa940cR41

@rdhabalia
Copy link
Contributor

I see.. instead only keeping static output in test, can we also keep pervious dependencies for a release and add real tests with different payload size. we can remove it next release once we have proof of successful tests for a release.?

@aahmed-se
Copy link
Contributor

run java8 tests
run integration tests

@aahmed-se
Copy link
Contributor

@merlimat there is genuine issue here.

Error Message
Could not initialize class io.airlift.slice.Slices
Stacktrace
java.lang.NoClassDefFoundError: Could not initialize class io.airlift.slice.Slices
	at io.airlift.compress.zstd.ZstdFrameCompressor.writeChecksum(ZstdFrameCompressor.java:133)
	at io.airlift.compress.zstd.ZstdFrameCompressor.compress(ZstdFrameCompressor.java:156)
	at io.airlift.compress.zstd.ZstdCompressor.compress(ZstdCompressor.java:93)
	at org.apache.pulsar.common.compression.CompressionCodecZstd.encode(CompressionCodecZstd.java:77)
	at org.apache.pulsar.common.compression.CompressorCodecTest.testCompressDecompress(CompressorCodecTest.java:64)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
	at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:54)
	at org.testng.internal.InvokeMethodRunnable.run(InvokeMethodRunnable.java:44)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

@merlimat
Copy link
Contributor Author

@rdhabalia Added tests to prove compress/decompress compatibility with current JNI implementations.

@codelipenghui
Copy link
Contributor

@merlimat Could you please resolve the conflicts? So that we can onboard this in 2.6.0 release.

@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

2 similar comments
@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

1 similar comment
@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

@codelipenghui codelipenghui merged commit b22b323 into apache:master Jun 4, 2020
zeo1995 pushed a commit to zeo1995/pulsar that referenced this pull request Jun 5, 2020
…te-update

* 'website-update' of github.com:zeo1995/pulsar: (432 commits)
  Fixed ordering issue in KeyShared dispatcher when adding consumer (apache#7106)
  Fix Duplicated messages are sent to dead letter topic apache#6960 (apache#7021)
  [Issue 2793][Doc]--Update the TLS hostname verification for CPP and Python clients (apache#7162)
  [Doc]--set netty mex frame size (apache#7174)
  [Doc] Update for the maximum message size (apache#7171)
  Fixed KeyShared consumers getting stuck on delivery (apache#7105)
  [apache#6003][pulsar-functions] Possibility to add builtin Functions (apache#6895)
  [Issue 6921][pulsar-broker-common] Replaced "Paths.get(...).getParent()", because it's system dependent and uses '\' as path separator on Windows (apache#6992)
  Improve broker unit test CI (apache#7173)
  Fix typo in exception message (apache#7027)
  Support KeyValue Schema Use Null Key And Null Value (apache#7139)
  [Doc]--Update documents for support consumer priority level in failover mode (apache#7136)
  Add schema config to cpp and cgo docs. (apache#7137)
  [Doc]--Update for the maximum message size (apache#7160)
  [C++] Expose ZSTD and Snappy compression to C API (apache#7014)
  [pulsar-proxy] add proxyLogLevel into config file (apache#6948)
  Add multi-hosts example for bookkeeperMetadataServiceUri (apache#6998)
  support for termination of partitioned topic (apache#6126)
  Use pure-java Air-Compressor instead of JNI based libraries (apache#5390)
  [Issues 5709]remove the namespace checking (apache#5716)
  ...

# Conflicts:
#	site2/website/scripts/split-swagger-by-version.js
cdbartholomew pushed a commit to kafkaesque-io/pulsar that referenced this pull request Jul 24, 2020
)

* Use pure-java Air-Compressor instead of JNI based libraries

* Fixed license files

* Fixed non-needed exclusion

* Added compat tests with JNI implementations

* Ensure direct buffer is used in the test

* Ensure direct bytebuf for both compression and decompression test

Co-authored-by: penghui <penghui@apache.org>
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this pull request Aug 24, 2020
)

* Use pure-java Air-Compressor instead of JNI based libraries

* Fixed license files

* Fixed non-needed exclusion

* Added compat tests with JNI implementations

* Ensure direct buffer is used in the test

* Ensure direct bytebuf for both compression and decompression test

Co-authored-by: penghui <penghui@apache.org>
cdbartholomew added a commit to kafkaesque-io/pulsar that referenced this pull request Sep 13, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants