Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-15171] [serialization] fix performance regression caused by too many buffer allocations on string serialization #10529

Merged
merged 1 commit into from Dec 19, 2019

Conversation

shuttie
Copy link
Contributor

@shuttie shuttie commented Dec 11, 2019

What is the purpose of the change

FLINK-14346 Introduced a faster implementation for string [de]serialization. But while running the flink-benchmarks suite, there was a performance regression found for almost all serialization tests: a significant 10% drop-down for the total job throughput.

Flame graph before the FLINK-14346 was applied:
flink-gc-dec05

Flame graph after the FLINK-14346 was applied:
flink-gc-dec11

From these almost identical graphs we may notice that the GC spends much more time cleaning up the heap with the FLINK-14346 applied.

Running the new and old code with the allocation profiling proved the theory with higher allocation rate:

Top allocations, with FLINK-14346:

       bytes  percent  samples  top
  ----------  -------  -------  ---
  8222540128   32.45%    40779  byte[]
  7509810768   29.64%    37258  char[]
  4320201040   17.05%    21491  java.lang.String
  1667513984    6.58%     8247  org.apache.flink.api.java.tuple.Tuple2
   749432744    2.96%     3711  org.apache.flink.api.java.tuple.Tuple8
   589192264    2.33%     2897  java.lang.String[]
   497193120    1.96%     2458  org.apache.flink.streaming.runtime.streamrecord.StreamRecord
   478790376    1.89%     2372  org.apache.flink.api.java.tuple.Tuple2[]
   404943784    1.60%     2007  java.lang.ThreadLocal$ThreadLocalMap
   156780240    0.62%      564  java.nio.DirectByteBuffer

Top allocations, no FLINK-14346:

       bytes  percent  samples  top
  ----------  -------  -------  ---
  7591122240   29.43%     3271  char[]
  5360582240   20.78%     2243  java.lang.ThreadLocal$ThreadLocalMap
  5147640184   19.96%     2231  java.lang.String
  1758207472    6.82%      765  org.apache.flink.api.java.tuple.Tuple2
  1717572128    6.66%      758  java.util.concurrent.locks.AbstractQueuedSynchronizer$Node
   891013696    3.45%      380  org.apache.flink.api.java.tuple.Tuple8
   598698832    2.32%      266  java.lang.String[]
   440182240    1.71%      202  org.apache.flink.streaming.runtime.streamrecord.StreamRecord
   364959680    1.41%      141  org.apache.flink.api.java.tuple.Tuple2[]

So almost third of all the allocations made were done for these intermediate array buffers.

All the benchmarks posted in the original PR were done on Ryzen 7 2700 (8 physical cores), and the CPU used for the flink-benchmarks is i7 7700 (4 physical cores). Also note that almost all the flink-benchmarks use parallelism=4, so:

  • new code generated +30% more garbage.
  • as originally performance was measured on a 8 core CPU with only 4 threads, GC threads were scheduled on the idle cores and didn't interfere with the benchmark.
  • on the i7 7700 flink-benchmarks used 4 threads on 4 core CPU with additional active GC threads, heavily interfering with the main benchmark.
  • this is the reason why lowering the parallelism for the stringHeavyBenchmark improved the throughput: it added more space for the GC threads to run.

With this PR we did the following:

  • added a static ThreadLocal<byte[]> buffer for short strings smaller than 1024 characters
  • when the string is short enough, we do not allocate the buffer, but reuse the static one, eliminating the allocation completely.
  • for this case we need to always preallocate a small buffer for each worker thread, even if there is almost no string serialization at all.
  • for long strings we do a regular allocation as before.

Brief change log

  • Add ThreadLocal byte buffer for write and read path for short strings instead of allocating it on each invocation.

Verifying this change

This change is already covered by existing tests, such as StringSerializationTest.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: ( yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 234e7ef (Wed Dec 11 11:10:25 UTC 2019)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@shuttie
Copy link
Contributor Author

shuttie commented Dec 11, 2019

Raw serialization microbenchmarks:

before this PR:

[info] Benchmark                                    (length)  (stringType)  Mode  Cnt   Score   Error  Units
[info] StringSerializerBenchmark.serializeImproved         1         ascii  avgt   10  29.746 ± 1.522  ns/op
[info] StringSerializerBenchmark.serializeImproved         2         ascii  avgt   10  31.293 ± 1.636  ns/op
[info] StringSerializerBenchmark.serializeImproved         4         ascii  avgt   10  34.226 ± 1.311  ns/op
[info] StringSerializerBenchmark.serializeImproved         8         ascii  avgt   10  38.368 ± 1.015  ns/op
[info] StringSerializerBenchmark.serializeImproved        16         ascii  avgt   10  44.410 ± 0.674  ns/op
[info] StringSerializerBenchmark.serializeImproved        32         ascii  avgt   10  56.724 ± 1.874  ns/op

[info] Benchmark                                        (length)  (stringType)  Mode  Cnt   Score    Error  Units
[info] StringDeserializerBenchmark.deserializeImproved         1         ascii  avgt   10  50.539 ±  1.434  ns/op
[info] StringDeserializerBenchmark.deserializeImproved         2         ascii  avgt   10  50.799 ±  1.676  ns/op
[info] StringDeserializerBenchmark.deserializeImproved         4         ascii  avgt   10  50.866 ±  1.856  ns/op
[info] StringDeserializerBenchmark.deserializeImproved         8         ascii  avgt   10  53.086 ±  2.656  ns/op
[info] StringDeserializerBenchmark.deserializeImproved        16         ascii  avgt   10  54.627 ±  0.142  ns/op
[info] StringDeserializerBenchmark.deserializeImproved        32         ascii  avgt   10  70.481 ± 11.272  ns/op

After this PR:

[info] Benchmark                                    (length)  (stringType)  Mode  Cnt   Score   Error  Units
[info] StringSerializerBenchmark.serializeImproved         1         ascii  avgt   10  25.427 ± 0.236  ns/op
[info] StringSerializerBenchmark.serializeImproved         2         ascii  avgt   10  25.435 ± 0.640  ns/op
[info] StringSerializerBenchmark.serializeImproved         4         ascii  avgt   10  31.293 ± 0.523  ns/op
[info] StringSerializerBenchmark.serializeImproved         8         ascii  avgt   10  28.790 ± 0.183  ns/op
[info] StringSerializerBenchmark.serializeImproved        16         ascii  avgt   10  32.189 ± 1.750  ns/op
[info] StringSerializerBenchmark.serializeImproved        32         ascii  avgt   10  42.216 ± 1.703  ns/op

[info] Benchmark                                        (length)  (stringType)  Mode  Cnt   Score   Error  Units
[info] StringDeserializerBenchmark.deserializeImproved         1         ascii  avgt   10  50.706 ± 2.197  ns/op
[info] StringDeserializerBenchmark.deserializeImproved         2         ascii  avgt   10  50.994 ± 1.868  ns/op
[info] StringDeserializerBenchmark.deserializeImproved         4         ascii  avgt   10  51.580 ± 1.658  ns/op
[info] StringDeserializerBenchmark.deserializeImproved         8         ascii  avgt   10  53.936 ± 2.266  ns/op
[info] StringDeserializerBenchmark.deserializeImproved        16         ascii  avgt   10  62.628 ± 3.479  ns/op
[info] StringDeserializerBenchmark.deserializeImproved        32         ascii  avgt   10  62.392 ± 0.559  ns/op

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 11, 2019

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@shuttie
Copy link
Contributor Author

shuttie commented Dec 11, 2019

And the problematic flink-benchmarks:

1 thread  with-fix SerializationFrameworkMiniBenchmarks.serializerTuple  thrpt   50  628.377 ± 14.407  ops/ms
1 thread    no-fix SerializationFrameworkMiniBenchmarks.serializerTuple  thrpt   50  628.152 ± 11.835  ops/ms

8 thread  with-fix SerializationFrameworkMiniBenchmarks.serializerTuple  thrpt   50  451.422 ± 5.737  ops/ms
8 thread    no-fix SerializationFrameworkMiniBenchmarks.serializerTuple  thrpt   50  450.485 ± 6.710  ops/ms

16 thread with-fix SerializationFrameworkMiniBenchmarks.serializerTuple  thrpt   50  260.823 ± 4.057  ops/ms
16 thread   no-fix SerializationFrameworkMiniBenchmarks.serializerTuple  thrpt   50  190.611 ± 2.663  ops/ms

so if you stress the GC hard enough, it will start interfering with the benchmark code.

@shuttie
Copy link
Contributor Author

shuttie commented Dec 11, 2019

A good follow-up question is "how large should be the SHORT_STRING_MAX_LENGTH constant"? Currently it's 1024, as I guess having a per-thread overhead of +3kb is indistinguishable from noise, but gives a nice performance boost.

It can be trivially bumped to something like 64kb without any drawbacks, but from my personal experience with Apache Flink, giant strings are quite a rare case in real world projects. And for 1kb+ strings, the original PR already improves the situation.

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update, but I have some doubts/questions.

I see from where the extra byte[] allocations were coming from, but I do not understand where did the char[] come?

      bytes  percent  samples  top
  ----------  -------  -------  ---
  8222540128   32.45%    40779  byte[]
  7509810768   29.64%    37258  char[]

Secondly, we have executed this fix on the benchmarking machine, but it doesn't seem like fixed the issue:
https://imgur.com/oeInxQm

For example serializerTuple before the regression had ~680, while after the regression 590-618. This PR's result is ~605 which is in the range “after the regression” 😕
http://codespeed.dak8s.net:8000/timeline/?ben=serializerTuple&env=2

The same could be said about serializerRow which is also below the pre-regression levels (630-640)

Comment on lines 788 to 780
if (len < SHORT_STRING_MAX_LENGTH) {
// skip allocating a separate buffer and reuse the thread-local one.
// as allocating the buffer for small strings can produce too much GC pressure.
buf = shortStringBuffer.get();
in.readFully(buf, 0, len);
} else {
buf = new byte[len];
in.readFully(buf);
}
Copy link
Contributor

@pnowojski pnowojski Dec 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely one issue with this approach is that we do not have benchmark coverage for strings > 1KB. SerializationFrameworkMiniBenchmarks#serializerHeavyString as I can see is using just 1KB strings? Maybe it just barely qualifies for the non ThreadLocal branch, but if we wanted to go such direction, we would need to either decrease the size of the buffer (100?) or incease strings in serializerHeavyString (SHORT_STRING_MAX_LENGTH * 1.5 ?).

However do we even need such if (len < SHORT_STRING_MAX_LENGTH) branch (both here and below)? Couldn't this have a single per thread fixed size buffer, but if the string is larger then the buffer's max size, read/write string from/to the fixed size buffer size in steps? like:

for step in 0..string.size()/buf.length:
  for i in 0..buf.length:
     buf[i] = string[step * buf.length + i] 
  // process buffer

or probably more precisely:

for step in 0..string.size()/buf.length: 
    in.readFully(buf, 0, buf.length) 
    // process buffer

@shuttie
Copy link
Contributor Author

shuttie commented Dec 16, 2019

@pnowojski thanks for the feedback. Current status:

  • reading the string via thread-local fixed-size buffer is really cool idea, I will make an impl of this today, it should be quite trivial.
  • most of the char[] allocations come from the readString method: we are allocating a temp buffer to decode characters to, and then the String constructor makes a deep copy of this buffer. Actually there is no need to allocate this buffer on every invocation, we can do the same trick with buffer reuse.

The most concerning thing for me is being unable to reproduce the performance regressions on flink-benchmarks. I will try to run them on similar 4-core EC2 instances like c4, hopefully it will help me to understand what's going on. But my main suspect is GC pressure, and both of the planned changes should reduce it quite significantly.

@shuttie shuttie changed the base branch from master to release-1.10 December 16, 2019 10:29
@pnowojski
Copy link
Contributor

@shuttie , yes, I think the GC pressure is the prime suspect here. For example the brief blimp in the performance on the timeline around http://codespeed.dak8s.net:8000/timeline/?ben=serializerTuple&env=2 was caused by some memory related configuration changes (part of another investigation). I've revert those changes after a couple of runs, to not skew benchmark results for this regression.

Once you have provided an update, please ping me or @rkhachatryan so we can re-trigger the benchmark request run :)

@pnowojski
Copy link
Contributor

One more thing

most of the char[] allocations come from the readString method: we are allocating a temp buffer to decode characters to, and then the String constructor makes a deep copy of this buffer. Actually there is no need to allocate this buffer on every invocation, we can do the same trick with buffer reuse.

Yes, I've seen it, as far as I can tell, you haven't increased the amount of char[] allocations. But from the data you presented, it looks like you did? Or the number of allocations before/after regression was from two samples of a different length/duration? If that's the case, I struggle to understand, why there is still a performance regression for small strings with this PR.

@shuttie shuttie force-pushed the string-serializer-alloc branch 2 times, most recently from a80a9ad to 754decc Compare December 17, 2019 12:54
@shuttie
Copy link
Contributor Author

shuttie commented Dec 17, 2019

@pnowojski @rkhachatryan So looks like that I was able to understand what's going on with this performance regression. To reproduce the issue I had to write yet another benchmark mimicking the one in SerializationFrameworkMiniBenchmarks.serializerTuple, but without job setup overhead (as it may take almost 50% of the benchmark runtime). This benchmark just roundtrips the Tuple8 to byte array and back, using different serializer implementations (I will make a PR to flink-benchmarks later).

Benchmark        Mode  Cnt    Score   Error   Units
roundtripTuple  thrpt   50  623.194 ± 4.601  ops/ms   pre-FLINK-14346
roundtripTuple  thrpt   50  590.663 ± 3.570  ops/ms   FLINK-14346

Compared to the original pre-1.10 implementation, we do much more things in the new version:

  1. compute exact buffer size
  2. allocate the buffer itself
  3. encode chars to the buffer (and not to the stream directly)
  4. flush the buffer to the stream

As @pnowojski suggested, there is no need to compute exact buffer size while doing serialization on step 1, we can allocate a small buffer and flush it when it's exhausted. Also, as this small buffer has fixed size, a simple idea was to allocate it once in ThreadLocal and reuse it later:

Benchmark        Mode  Cnt    Score   Error   Units
roundtripTuple  thrpt   50  623.194 ± 4.601  ops/ms   pre-FLINK-14346
roundtripTuple  thrpt   50  590.663 ± 3.570  ops/ms   FLINK-14346
roundtripTuple  thrpt   50  613.878 ± 2.498  ops/ms   FLINK-15171 with ThreadLocal

The results were a bit better, as we never scanned the string twice and significantly reduced the number of allocations. But the results were still not that good as before. Surprisingly, ThreadLocal manipulations took most of the time:

image

Then we made a test without ThreadLocal, but with fixed-size allocated buffer of size 8 and 16:

Benchmark        Mode  Cnt    Score   Error   Units
roundtripTuple  thrpt   50  623.194 ± 4.601  ops/ms   pre-FLINK-14346
roundtripTuple  thrpt   50  590.663 ± 3.570  ops/ms   FLINK-14346
roundtripTuple  thrpt   50  613.878 ± 2.498  ops/ms   FLINK-15171 with ThreadLocal
roundtripTuple  thrpt   50  622.679 ± 4.347  ops/ms   FLINK-15171 buffer[8]
roundtripTuple  thrpt   50  631.729 ± 4.937  ops/ms   FLINK-15171 buffer[16]

It improved the situation quite dramatically. We also did a set of benchmarks to observe the impact of buffer size on read/write performance for strings of different length: usually the larger the buffer, the better the performance.

[info] Benchmark              (bufferSize)  (length)  (stringType)  Mode  Cnt     Score    Error  Units
[info] deserializeImproved               8         1         ascii  avgt   10    50.854 ±  1.187  ns/op
[info] deserializeImproved               8         4         ascii  avgt   10    51.080 ±  1.235  ns/op
[info] deserializeImproved               8        16         ascii  avgt   10    67.324 ±  2.230  ns/op
[info] deserializeImproved               8       256         ascii  avgt   10   568.002 ± 16.895  ns/op
[info] deserializeImproved              16         1         ascii  avgt   10    51.080 ±  1.846  ns/op
[info] deserializeImproved              16         4         ascii  avgt   10    51.044 ±  1.862  ns/op
[info] deserializeImproved              16        16         ascii  avgt   10    55.298 ±  0.419  ns/op
[info] deserializeImproved              16       256         ascii  avgt   10   400.586 ±  1.440  ns/op
[info] deserializeImproved              32         1         ascii  avgt   10    51.196 ±  0.443  ns/op
[info] deserializeImproved              32         4         ascii  avgt   10    51.587 ±  0.881  ns/op
[info] deserializeImproved              32        16         ascii  avgt   10    56.025 ±  0.159  ns/op
[info] deserializeImproved              32       256         ascii  avgt   10   335.878 ±  1.231  ns/op
[info] serializeImproved                 8         1         ascii  avgt   50    30.823 ±  1.189  ns/op
[info] serializeImproved                 8         4         ascii  avgt   50    31.606 ±  0.479  ns/op
[info] serializeImproved                 8        16         ascii  avgt   50    87.594 ±  0.700  ns/op
[info] serializeImproved                 8       256         ascii  avgt   50   861.818 ±  3.382  ns/op
[info] serializeImproved                16         1         ascii  avgt   50    30.295 ±  0.429  ns/op
[info] serializeImproved                16         4         ascii  avgt   50    32.123 ±  0.406  ns/op
[info] serializeImproved                16        16         ascii  avgt   50    70.481 ±  0.830  ns/op
[info] serializeImproved                16       256         ascii  avgt   50   522.778 ±  3.020  ns/op
[info] serializeImproved                32         1         ascii  avgt   50    30.973 ±  0.284  ns/op
[info] serializeImproved                32         4         ascii  avgt   50    32.353 ±  0.313  ns/op
[info] serializeImproved                32        16         ascii  avgt   50    38.090 ±  0.383  ns/op
[info] serializeImproved                32       256         ascii  avgt   50   418.664 ±  4.335  ns/op

But allocating a large buffer for a short string seems to be a bit wasteful, so we tried to make a flexible implementation of buffer sizing (like min(32, max(8, 1+strlen)) and surprizingly the results degraded quite significantly:

Benchmark        Mode  Cnt    Score   Error   Units
roundtripTuple  thrpt   50  623.194 ± 4.601  ops/ms   pre-FLINK-14346
roundtripTuple  thrpt   50  590.663 ± 3.570  ops/ms   FLINK-14346
roundtripTuple  thrpt   50  613.878 ± 2.498  ops/ms   FLINK-15171 with ThreadLocal
roundtripTuple  thrpt   50  622.679 ± 4.347  ops/ms   FLINK-15171 buffer[8]
roundtripTuple  thrpt   50  631.729 ± 4.937  ops/ms   FLINK-15171 buffer[16]
roundtripTuple  thrpt   50  547.097 ± 2.687  ops/ms   FLINK-15171 buffer[dynamic]

If you check the output of perfasm profiler comparing buffer[16] and buffer[dynamic] variants, you will notice that when you allocate a small byte array with known size at the moment of compilation, then JVM can do scalarisation: skip heap allocation, and allocate only 16 bytes right on the stack.

When the buffer is dynamic, then it's always going to heap with significant performance penalty.

As for the increased number of char[] allocations - it looks like to be related to the benchmarking process. As if you increase throughput, then you increase the number of Strings produced by serializer, then increasing the number of chars[] allocated.

So, to sum up the current status of this PR:

  1. The current version of the regression fix is ready for review. Not yet ready, need to fix the issue with HybridMemorySegment, see the next comment.
  2. The flink-benchmark PR with a narrower reproducer used here for the performance regression will be created tomorrow.
  3. I also would like to make a PR to flink-benchmark to extract all the flink job setup code in the SerializationFrameworkMiniBenchmarks out of the main benchmark code, so the results will be much more reproducible and representable. Currently job setup code it highly sensitive to system RAM throughput, that's why I was not able to see this regression, as my RAM is ~15% faster than the one on benchmark machine.

@shuttie
Copy link
Contributor Author

shuttie commented Dec 17, 2019

Looks like I was over-optimistic: while running SerializationFrameworkMiniBenchmarks with this PR, regression is still here (but not as dramatic as before). While running the benchmark code with async-profiler and -XX:+DebugNonSafepoints enabled, I finally was able to find where is the difference between my benchmark and the SerializationFrameworkMiniBenchmarks: they have different underlying buffer implementations with really different performance characteristics.

  • roundtripTuple uses a thin wrapper on heap-only byte[].
  • SerializationFrameworkMiniBenchmarks uses HybridMemorySegment, which is off-heap by default.

I will try to reproduce the issue yet another time, but with different underlying buffer implementations.

@pnowojski
Copy link
Contributor

pnowojski commented Dec 17, 2019

Thanks for the update @shuttie I will be waiting for your next update.

SerializationFrameworkMiniBenchmarks uses HybridMemorySegment, which is off-heap by default.

Indeed that could be causing some differences.

Regarding the char[] allocations, I was basing on those data:

     bytes  percent  samples  top
  ----------  -------  -------  ---
  8222540128   32.45%    40779  byte[]
  7509810768   29.64%    37258  char[]
  4320201040   17.05%    21491  java.lang.String
     bytes  percent  samples  top
  ----------  -------  -------  ---
  7591122240   29.43%     3271  char[]
  5360582240   20.78%     2243  java.lang.ThreadLocal$ThreadLocalMap
  5147640184   19.96%     2231  java.lang.String

As I wrote in the previous message, if you compare relative char[] vs String allocations, it seems more or less fine. Previously I was looking just at the numbers 3271 vs 37258, which didn't look fine.

Re the ThreadLocalMap overhead, we can even see it in the number of allocations. Maybe that's what increases the GC pressure?

@shuttie
Copy link
Contributor Author

shuttie commented Dec 18, 2019

@pnowojski @rkhachatryan Indeed after creating a more proper microbenchmark using exactly the same input/output buffer implementations as in SerializationFrameworkMiniBenchmarks gave much more clearer results, highlighting exactly the same issue:

[info] Benchmark            (length)  (stringType)  Mode  Cnt     Score     Error  Units
[info] deserializeDefault          1         ascii  avgt    5    28.089 ±   0.375  ns/op
[info] deserializeDefault          4         ascii  avgt    5    32.567 ±  13.468  ns/op
[info] deserializeDefault         16         ascii  avgt    5    44.452 ±   0.887  ns/op
[info] deserializeDefault        128         ascii  avgt    5   213.988 ±  39.035  ns/op
[info] deserializeDefault        256         ascii  avgt    5   460.678 ±  83.841  ns/op
[info] deserializeDefault        512         ascii  avgt    5   902.212 ± 154.272  ns/op
[info] deserializeDefault       1024         ascii  avgt    5  1978.651 ± 319.524  ns/op
[info] deserializeImproved         1         ascii  avgt    5    36.451 ±   1.774  ns/op
[info] deserializeImproved         4         ascii  avgt    5    37.754 ±   0.794  ns/op
[info] deserializeImproved        16         ascii  avgt    5    56.253 ±   1.153  ns/op
[info] deserializeImproved       128         ascii  avgt    5   281.926 ±  99.284  ns/op
[info] deserializeImproved       256         ascii  avgt    5   558.531 ± 178.158  ns/op
[info] deserializeImproved       512         ascii  avgt    5  1036.776 ± 244.957  ns/op
[info] deserializeImproved      1024         ascii  avgt    5  3160.582 ± 165.638  ns/op

[info] serializeDefault            1         ascii  avgt    5     6.600 ±   0.258  ns/op
[info] serializeDefault            4         ascii  avgt    5    10.736 ±   0.384  ns/op
[info] serializeDefault           16         ascii  avgt    5    22.068 ±   0.172  ns/op
[info] serializeDefault          128         ascii  avgt    5   434.391 ±  28.898  ns/op
[info] serializeDefault          256         ascii  avgt    5   853.974 ±  98.963  ns/op
[info] serializeDefault          512         ascii  avgt    5  1704.031 ± 127.931  ns/op
[info] serializeDefault         1024         ascii  avgt    5  3320.371 ± 181.818  ns/op
[info] serializeImproved           1         ascii  avgt    5     9.369 ±   0.056  ns/op
[info] serializeImproved           4         ascii  avgt    5    12.780 ±   0.234  ns/op
[info] serializeImproved          16         ascii  avgt    5    24.274 ±   1.385  ns/op
[info] serializeImproved         128         ascii  avgt    5   290.595 ±  22.052  ns/op
[info] serializeImproved         256         ascii  avgt    5   587.063 ±  18.113  ns/op
[info] serializeImproved         512         ascii  avgt    5  1115.309 ±  17.240  ns/op
[info] serializeImproved        1024         ascii  avgt    5  2317.564 ±  26.211  ns/op

Surprizingly, running serialization code over DataOutputStreamWrapper(ByteArrayOutputStream) was not the best idea, as due to megamorphic calls to ByteArrayOutputStream.read(byte) the original flink implementation was terribly slow.

Buffering helped to avoid these heavy per-byte read/write calls, but running the same code, but over MemorySegment (which is monoporphic) improved the situation dramatically, so the flink's implementation was actually quite performant, we just benchmarked it wrong.

After staring for a long time to the perfasm listings of the code in FLINK-14346, we found that:

  • on deserialization: buffer allocation and data copying seemed to take most of the time.
  • on serialization: code from FLINK-14346 had better loop unrolling in the asm listing (both external and internal loops are unrolled), but still buffer allocation and data copying were taking too much time.

So seems that idea to add buffering was not that good initially. But still some of the ideas in original FLINK-14346 implementations were still worth checking, so we did:

  • ported unrolled implementations of string size writing and char writing
  • for deserialization we used ThreadLocal<char[]> intermediate buffer instead of allocating it on each invocation.

At the end, the final results seem to show a nice +30% improvement for both write and read throughput on microbenchmark (but not as nice as originally claimed, unfortunately):

[info] Benchmark            (length)  (stringType)  Mode  Cnt     Score    Error  Units
[info] deserializeDefault          1         ascii  avgt   30    23.903 ±  0.266  ns/op
[info] deserializeDefault          4         ascii  avgt   30    26.371 ±  0.248  ns/op
[info] deserializeDefault         16         ascii  avgt   30    40.711 ±  1.187  ns/op
[info] deserializeDefault        128         ascii  avgt   30   289.613 ± 21.176  ns/op
[info] deserializeDefault        256         ascii  avgt   30   633.237 ± 47.604  ns/op
[info] deserializeDefault        512         ascii  avgt   30   820.571 ±  7.825  ns/op
[info] deserializeDefault       1024         ascii  avgt   30  1761.036 ± 25.948  ns/op
[info] deserializeImproved         1         ascii  avgt   30    18.546 ±  0.183  ns/op
[info] deserializeImproved         4         ascii  avgt   30    20.753 ±  0.517  ns/op
[info] deserializeImproved        16         ascii  avgt   30    31.796 ±  0.147  ns/op
[info] deserializeImproved       128         ascii  avgt   30   148.159 ±  2.655  ns/op
[info] deserializeImproved       256         ascii  avgt   30   286.721 ±  3.492  ns/op
[info] deserializeImproved       512         ascii  avgt   30   674.932 ±  2.495  ns/op
[info] deserializeImproved      1024         ascii  avgt   30  1361.801 ±  8.740  ns/op
[info] serializeDefault            1         ascii  avgt   30     7.113 ±  0.341  ns/op
[info] serializeDefault            4         ascii  avgt   30    15.779 ±  0.195  ns/op
[info] serializeDefault           16         ascii  avgt   30    60.260 ±  1.022  ns/op
[info] serializeDefault          128         ascii  avgt   30   364.671 ±  1.541  ns/op
[info] serializeDefault          256         ascii  avgt   30   732.862 ±  9.764  ns/op
[info] serializeDefault          512         ascii  avgt   30  1455.048 ± 19.815  ns/op
[info] serializeDefault         1024         ascii  avgt   30  2921.182 ± 37.154  ns/op
[info] serializeImproved           1         ascii  avgt   30     5.469 ±  0.059  ns/op
[info] serializeImproved           4         ascii  avgt   30    11.976 ±  0.720  ns/op
[info] serializeImproved          16         ascii  avgt   30    37.645 ±  0.540  ns/op
[info] serializeImproved         128         ascii  avgt   30   286.634 ±  1.193  ns/op
[info] serializeImproved         256         ascii  avgt   30   592.564 ± 37.882  ns/op
[info] serializeImproved         512         ascii  avgt   30  1227.392 ± 55.484  ns/op
[info] serializeImproved        1024         ascii  avgt   30  2608.061 ± 29.902  ns/op

On SerializationFrameworkMiniBenchmarks which are much more real-worldy, the situation seems to be also quite nice with +5% improvement in throughput (at least compared to the previous -7% regression):

before FLINK-14346:

Benchmark                                                    Mode  Cnt    Score    Error   Units
SerializationFrameworkMiniBenchmarks.serializerAvro         thrpt   30  392.587 ± 12.009  ops/ms
SerializationFrameworkMiniBenchmarks.serializerHeavyString  thrpt   30   82.036 ±  0.420  ops/ms
SerializationFrameworkMiniBenchmarks.serializerKryo         thrpt   30  160.399 ± 10.793  ops/ms
SerializationFrameworkMiniBenchmarks.serializerPojo         thrpt   30  459.539 ±  5.958  ops/ms
SerializationFrameworkMiniBenchmarks.serializerRow          thrpt   30  595.623 ± 10.421  ops/ms
SerializationFrameworkMiniBenchmarks.serializerTuple        thrpt   30  661.703 ±  7.895  ops/ms

with FLINK-15171:

Benchmark                                                    Mode  Cnt    Score    Error   Units
SerializationFrameworkMiniBenchmarks.serializerAvro         thrpt   30  379.987 ± 13.619  ops/ms
SerializationFrameworkMiniBenchmarks.serializerHeavyString  thrpt   30   87.521 ±  1.275  ops/ms
SerializationFrameworkMiniBenchmarks.serializerKryo         thrpt   30  160.332 ±  9.577  ops/ms
SerializationFrameworkMiniBenchmarks.serializerPojo         thrpt   30  465.664 ±  6.814  ops/ms
SerializationFrameworkMiniBenchmarks.serializerRow          thrpt   30  622.130 ± 19.682  ops/ms
SerializationFrameworkMiniBenchmarks.serializerTuple        thrpt   30  679.704 ± 14.360  ops/ms

I'm currently preparing a PR to flink-benchmarks with the latest tests.

…o many buffer allocations on string serialization
Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix. I've executed a benchmark-request on it, and indeed it looks like a performance regression is gone. With a single run it's hard to say if there is some couple of % change either direction (improvement or regression), it would have to be checked via longer trend.

LGTM, merging.

@pnowojski pnowojski merged commit 0486ab0 into apache:release-1.10 Dec 19, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants