Skip to content

IGNITE-28836 DirectMessageWriter: reduce per-field overhead and per-message allocations on the message serialization hot path#13296

Open
anton-vinogradov wants to merge 1 commit into
apache:masterfrom
anton-vinogradov:ignite-28836
Open

IGNITE-28836 DirectMessageWriter: reduce per-field overhead and per-message allocations on the message serialization hot path#13296
anton-vinogradov wants to merge 1 commit into
apache:masterfrom
anton-vinogradov:ignite-28836

Conversation

@anton-vinogradov

@anton-vinogradov anton-vinogradov commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

IGNITE-28836

https://issues.apache.org/jira/browse/IGNITE-28836

DirectMessageWriter is on the critical path of every outgoing message. This PR removes two inefficiencies:

  1. Per-field stream resolution. Each writeXxx re-resolved state.item().stream (array load + bounds check + field load) on every primitive write. The current stream is now cached in curStream and refreshed only when the current item changes (setBuffer / beforeNestedWrite / afterNestedWrite).
  2. Per-field allocations in the compressed path. writeCompressedMessage() allocated a fresh ByteBuffer.allocateDirect(10KB) per compressed field (with a doubling re-allocation chain for payloads above 10KB) plus a brand-new DirectMessageWriter. CompressedMessage consumes the scratch buffer right in its constructor (deflates into its own byte[]), so the buffer never escapes writeCompressedMessage() and none of that churn is necessary. The writer now keeps one reusable heap scratch buffer (retained at the largest size seen) and a thread-confined tmpWriter (reset() per use), and grows the buffer without an intermediate byte[] copy.

No wire-format change, no public API change.

Benchmark

JmhDirectMessageWriterBenchmark (source attached in a comment below): compressedMapFields writes an exchange-style message with two compressed map fields, sized below (30 entries) and above (500 entries) the initial 10KB scratch; primitiveFields writes a ~35-field primitive message. JDK 17, -prof gc, master vs patched:

Benchmark master patched Delta
compressedMapFields(30), throughput 17 550 ops/s 20 037 ops/s +14%
compressedMapFields(30), heap alloc 26.7 KB/op + 20 KB/op direct 24.3 KB/op, zero direct −9% heap, no Cleaner churn
compressedMapFields(30), GC time 192 ms 9 ms ×21 less
compressedMapFields(500), throughput 2 382 ops/s 2 475 ops/s +4% (within error)
compressedMapFields(500), heap alloc 365 KB/op + ~300 KB/op direct 322 KB/op, zero direct −12% heap, no Cleaner churn
compressedMapFields(500), GC time 56 ms 11 ms ×5 less
primitiveFields, throughput 34.9M ops/s 35.3M ops/s within error

Note on the allocation columns: master's direct-buffer churn is invisible to gc.alloc.rate.norm, but it shows up as GC time — Cleaner processing makes master's collections an order of magnitude more expensive at the same collection counts.

Testing

  • DirectMarshallingMessagesTest — nested containers written through 16-byte chunks (multi-pass resume across buffer swaps, exercises the curStream refresh points).
  • CompressedMessageTest — >40KB compressed payload (exercises the scratch-buffer growth path), byte-for-byte writer→reader round-trip.

🤖 Generated with Claude Code

@anton-vinogradov

Copy link
Copy Markdown
Contributor Author

Benchmark harness used for the numbers above, provided for reproducibility (not part of this PR). Drop it under modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/direct/ and run its main().

JmhDirectMessageWriterBenchmark.java
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.benchmarks.jmh.direct;

import java.nio.ByteBuffer;
import org.apache.ignite.internal.benchmarks.jmh.JmhAbstractBenchmark;
import org.apache.ignite.internal.benchmarks.jmh.runner.JmhIdeBenchmarkRunner;
import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.profile.GCProfiler;
import org.openjdk.jmh.runner.options.TimeValue;

/**
 * Benchmarks for {@link DirectMessageWriter} performance changes:
 * <ul>
 *     <li>{@link #hotPathPrimitiveFields} measures the per-field write overhead (the {@code curStream} caching that
 *     avoids re-resolving {@code state.item().stream} on every primitive write). Run it on the baseline build and on
 *     the patched build and compare throughput.</li>
 *     <li>{@link #compressedScratchOld} vs {@link #compressedScratchNew} model the per-field scratch allocation done by
 *     {@code writeCompressedMessage}: a fresh direct buffer + a brand-new writer (old) versus a reused writer + heap
 *     buffer (new). Run together with {@code -prof gc} to see the allocation-rate / direct-memory difference.</li>
 * </ul>
 */
@State(Scope.Thread)
public class JmhDirectMessageWriterBenchmark extends JmhAbstractBenchmark {
    /** Network buffer the hot-path benchmark serializes into (reused, like the real comms write loop). */
    private static final int NET_BUF = 64 * 1024;

    /** Scratch buffer capacity, matches {@code DirectMessageWriter.TMP_BUF_CAPACITY}. */
    private static final int TMP_BUF_CAPACITY = 10 * 1024;

    /** Writer under test for the hot path. */
    private DirectMessageWriter writer;

    /** Reused network buffer. */
    private ByteBuffer buf;

    /** Reusable writer modeling the patched compressed path. */
    private DirectMessageWriter reusable;

    /** */
    @Setup
    public void setup() {
        // The message factory is only used by writeMessage(); primitive writes and scratch allocation never touch it.
        writer = new DirectMessageWriter(null);
        buf = ByteBuffer.allocate(NET_BUF);
        reusable = new DirectMessageWriter(null);
    }

    /**
     * Hot path: serialize a batch of mixed primitive fields into a reused buffer, mirroring how a generated serializer
     * calls the writer field by field. Stresses the {@code state.item().stream} resolution done on every call.
     *
     * @return Accumulated {@code lastFinished} flag (consumed to prevent dead-code elimination).
     */
    @Benchmark
    public boolean hotPathPrimitiveFields() {
        ByteBuffer b = buf;

        b.clear();

        writer.setBuffer(b);

        boolean ok = true;

        // 256 "messages" of 7 mixed fields = 1792 writer calls, ~7 KB, fits the 64 KB buffer.
        for (int i = 0; i < 256; i++) {
            ok &= writer.writeByte((byte)i);
            ok &= writer.writeShort((short)i);
            ok &= writer.writeInt(i);
            ok &= writer.writeLong(i);
            ok &= writer.writeBoolean((i & 1) == 0);
            ok &= writer.writeFloat(i);
            ok &= writer.writeDouble(i);
        }

        return ok;
    }

    /**
     * Old compressed-path scratch allocation: fresh direct buffer + brand-new writer per compressed field.
     *
     * @return Allocated writer (returned to prevent dead-code elimination).
     */
    @Benchmark
    public Object compressedScratchOld() {
        ByteBuffer tmp = ByteBuffer.allocateDirect(TMP_BUF_CAPACITY);

        DirectMessageWriter w = new DirectMessageWriter(null);

        w.setBuffer(tmp);

        return w;
    }

    /**
     * New compressed-path scratch allocation: reused writer + heap buffer per compressed field.
     *
     * @return Reused writer (returned to prevent dead-code elimination).
     */
    @Benchmark
    public Object compressedScratchNew() {
        ByteBuffer tmp = ByteBuffer.allocate(TMP_BUF_CAPACITY);

        DirectMessageWriter w = reusable;

        w.reset();
        w.setBuffer(tmp);

        return w;
    }

    /**
     * @param args Arguments.
     * @throws Exception If failed.
     */
    public static void main(String[] args) throws Exception {
        JmhIdeBenchmarkRunner.create()
            .forks(1)
            .threads(1)
            .warmupIterations(6)
            .warmupTime(TimeValue.milliseconds(500))
            .measurementIterations(10)
            .measurementTime(TimeValue.milliseconds(500))
            .benchmarks(JmhDirectMessageWriterBenchmark.class.getSimpleName())
            .jvmArguments("-Xms2g", "-Xmx2g")
            .profilers(GCProfiler.class)
            .run();
    }
}

ByteBuffer tmpBuf = ByteBuffer.allocateDirect(TMP_BUF_CAPACITY);
// Scratch buffer is only copied into a heap array by CompressedMessage before deflating, so a heap
// buffer is strictly cheaper than a direct one (no native alloc / Cleaner churn) on this per-field path.
ByteBuffer tmpBuf = ByteBuffer.allocate(TMP_BUF_CAPACITY);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, when we have heap buffer, maybe it worth to use buf.array() in CompressedMessage.compress() instead of array copy?

…essage allocations on the message serialization hot path

Cache the current state item's stream (curStream) instead of re-resolving
state.item().stream on every primitive write. In writeCompressedMessage()
replace the per-field direct ByteBuffer with a reusable heap scratch buffer
(CompressedMessage consumes it in the constructor, so it never escapes the
method), reuse a thread-confined tmpWriter across fields, and grow the
scratch buffer without an intermediate byte[] copy.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
@anton-vinogradov

Copy link
Copy Markdown
Contributor Author

JMH benchmark used for the numbers in the PR description (kept out of the patch itself). Drop it into modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/direct/ and run with JDK 17:

java -cp ... org.openjdk.jmh.Main JmhDirectMessageWriterBenchmark -prof gc
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.benchmarks.jmh.direct;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.CoreMessagesProvider;
import org.apache.ignite.internal.benchmarks.jmh.runner.JmhIdeBenchmarkRunner;
import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.marshaller.Marshallers.jdk;
import static org.openjdk.jmh.annotations.Mode.Throughput;
import static org.openjdk.jmh.annotations.Scope.Thread;

/** Benchmarks the {@link DirectMessageWriter} hot path: per-field primitive writes and compressed map fields. */
@State(Thread)
@BenchmarkMode(Throughput)
@Warmup(iterations = 5, time = 3, timeUnit = SECONDS)
@Measurement(iterations = 5, time = 3, timeUnit = SECONDS)
@Fork(1)
public class JmhDirectMessageWriterBenchmark {
    /** */
    private DirectMessageWriter writer;

    /** */
    private ByteBuffer buf;

    /** */
    private UUID uuid;

    /** */
    public static void main(String[] args) throws Exception {
        JmhIdeBenchmarkRunner.create()
            .benchmarks(JmhDirectMessageWriterBenchmark.class.getName())
            .run();
    }

    /** */
    @Setup
    public void setup() {
        writer = new DirectMessageWriter(msgFactory());

        buf = ByteBuffer.allocate(4 * 1024 * 1024);

        uuid = UUID.randomUUID();
    }

    /** Mimics a generated serializer writing a message of mixed primitive fields. */
    @Benchmark
    public boolean primitiveFields() {
        buf.clear();

        writer.setBuffer(buf);

        boolean res = writer.writeHeader((short)42);

        for (int i = 0; i < 8; i++) {
            res &= writer.writeInt(i);
            res &= writer.writeLong(i);
            res &= writer.writeByte((byte)i);
            res &= writer.writeBoolean(true);
        }

        res &= writer.writeUuid(uuid);

        writer.reset();

        return res;
    }

    /** Exchange-style message with two compressed map fields. */
    @Benchmark
    public boolean compressedMapFields(CompressedPayload payload) {
        buf.clear();

        writer.setBuffer(buf);

        boolean res = writer.writeMessage(payload.msg, true);

        writer.reset();

        return res;
    }

    /** */
    private static MessageFactory msgFactory() {
        return new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{
            new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader())});
    }

    /** Payload sized below ({@code 30}) and above ({@code 500}) the initial 10K scratch buffer. */
    @State(Thread)
    public static class CompressedPayload {
        /** */
        @Param({"30", "500"})
        private int entries;

        /** */
        private GridDhtPartitionsFullMessage msg;

        /** */
        @Setup
        public void setup() {
            Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers = new HashMap<>();
            Map<UUID, Map<Integer, Set<Integer>>> partsToReload = new HashMap<>();

            for (int i = 0; i < entries; i++) {
                UUID nodeId = UUID.randomUUID();

                partHistSuppliers.put(nodeId, Map.of(new GroupPartitionIdPair(i, i + 1), i + 2L));
                partsToReload.put(nodeId, Map.of(i, Set.of(i + 1)));
            }

            msg = new GridDhtPartitionsFullMessage(null, null, new AffinityTopologyVersion(0), partHistSuppliers, partsToReload);
        }
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants