Skip to content
This repository has been archived by the owner on Aug 3, 2020. It is now read-only.

[FLINK-15171] string serialization benchmark to use proper i/o buffer implementation as on SerializationFrameworkMiniBenchmarks #44

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

shuttie
Copy link
Contributor

@shuttie shuttie commented Dec 18, 2019

This is a follow-up to the upstream FLINK-15171 PR.

Originally PojoSerializationBenchmark and StringSerializationBenchmark used simple jdk default byte[]-backed input-output streams and views to test the serialization performance. But this resulted in a severe performance mismatch with the e2e tests in SerializationFrameworkMiniBenchmarks: by default Flink is using own offheap-based implementation of memory operations, which has completely different performance characteristics.

So, for example, on StringSerializationBenchmark FLINK-14346 implementation of string serialization heavily outperformed the original implementation, being faster up to 15 times. But on SerializationFrameworkMiniBenchmarks there was performance degradation instead.

In this PR we do the following improvements:

  • we introduce an OffheapInputWrapper which is using exactly the same implementations of memory read operations as in default Flink with HybridMemorySegment
  • we switch to DataOutputSerializer to match the default one used in Flink for memory write operations.
  • both StringSerializationBenchmark and PojoSerializationBenchmark now use proper read-write primitives, matching the behavior of SerializationFrameworkMiniBenchmarks.

These improvements allowed to reliably reproduce the mysterious performance regression discussed in FLINK-15171.

…Benchmark use proper i/o buffer implementation as on SerializationFrameworkMiniBenchmarks
@@ -86,42 +86,69 @@ public static void main(String[] args)
}

@Benchmark
public byte[] writePojo() throws IOException {
return write(pojoSerializer, pojo);
@OperationsPerInvocation(INVOCATIONS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could this be moved to the top of the class?

@OperationsPerInvocation(INVOCATIONS)
public int writePojo() throws IOException {
stream.pruneBuffer();
for (int i = 0; i < INVOCATIONS; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you motivate why do we need multiple invocations in a single benchmark? Because of potential costs of stream.pruneBuffer(); or pojoBuffer.reset();? Are they measurable?

As it is, I would be a little be concerned what magic JIT can do after inlining pojoSerializer.serialize(pojo, stream); and unrolling the loop. That might be desireable (vectorisation) but as we are invoking it over and over again with the same parameters, some other magic could yield false results.

Comment on lines +18 to +26
public OffheapInputWrapper(byte[] initialPayload) throws Exception {
reader = new SpillingAdaptiveSpanningRecordDeserializer<>(new String[0]);
MemorySegment segment = MemorySegmentFactory.allocateUnpooledOffHeapMemory(initialPayload.length, this);
segment.put(0, initialPayload);
buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, true, initialPayload.length);
Field nonSpanningWrapper = reader.getClass().getDeclaredField("nonSpanningWrapper");
nonSpanningWrapper.setAccessible(true);
dataInput = (DataInputView) nonSpanningWrapper.get(reader);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a couple of issues here.

First, this relays on a non public Flink's API (SpillingAdaptiveSpanningRecordDeserializer), which means it can change at any point of time and brake the benchmark builds.

Secondly, this is using reflections to access a private field of an Internal class, which makes this benchmark even more fragile (and could cause compilation issues like this recent one).

Is there a way to avoid using internal classes? If there is really a no way, then it could be fixed similarly how #40 is refactoring the 15199 (take a look at org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils class).

Here in this case, the OffheapInputWrapper class should be defined and tested in flink repository, with some comment/annotation that it's being used by micro benchmarks. + code should be adjusted that the reflection is no longer needed.

@Myasuka
Copy link
Contributor

Myasuka commented Apr 30, 2020

@shuttie this repo will be mitigated to https://github.com/apache/flink-benchmarks, please create this PR under that repo once FLINK-17281 is completed.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants