Skip to content

Commit

Permalink
Reduce memory allocations of calls to ByteBufer.duplicate() made in o…
Browse files Browse the repository at this point in the history
…rg.apache.cassandra.transport.CBUtil#writeValue

patch by Natnael Adere; reviewed by Benedict Elliott Smith, David Capwell for CASSANDRA-18212
  • Loading branch information
Natnael Adere authored and dcapwell committed Mar 6, 2023
1 parent b7e1e44 commit 35f8da6
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
4.2
* Reduce memory allocations of calls to ByteBufer.duplicate() made in org.apache.cassandra.transport.CBUtil#writeValue (CASSANDRA-18212)
* CEP-17: SSTable API (CASSANDRA-17056)
* Gossip stateMapOrdering does not have correct ordering when both EndpointState are in the bootstrapping set (CASSANDRA-18292)
* Snapshot only sstables containing mismatching ranges on preview repair mismatch (CASSANDRA-17561)
Expand Down
45 changes: 43 additions & 2 deletions src/java/org/apache/cassandra/transport/CBUtil.java
Expand Up @@ -32,7 +32,6 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
Expand All @@ -46,6 +45,7 @@
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.memory.MemoryUtil;

/**
* ByteBuf utility methods.
Expand All @@ -69,6 +69,15 @@ protected CharsetDecoder initialValue()
}
};

private final static FastThreadLocal<ByteBuffer> localDirectBuffer = new FastThreadLocal<ByteBuffer>()
{
@Override
protected ByteBuffer initialValue()
{
return MemoryUtil.getHollowDirectByteBuffer();
}
};

private final static FastThreadLocal<CharBuffer> TL_CHAR_BUFFER = new FastThreadLocal<>();

private CBUtil() {}
Expand Down Expand Up @@ -478,7 +487,35 @@ public static void writeValue(ByteBuffer bytes, ByteBuf cb)
cb.writeInt(remaining);

if (remaining > 0)
cb.writeBytes(bytes.duplicate());
addBytes(bytes, cb);
}

public static void addBytes(ByteBuffer src, ByteBuf dest)
{
if (src.remaining() == 0)
return;

int length = src.remaining();

if (src.hasArray())
{
// Heap buffers are copied using a raw array instead of shared heap buffer and MemoryUtil.unsafe to avoid a CMS bug, which causes the JVM to crash with the follwing:
// # Problematic frame:
// # V [libjvm.dylib+0x63e858] void ParScanClosure::do_oop_work<unsigned int>(unsigned int*, bool, bool)+0x94
// More details can be found here: https://bugs.openjdk.org/browse/JDK-8222798
byte[] array = src.array();
dest.writeBytes(array, src.arrayOffset() + src.position(), length);
}
else if (src.isDirect())
{
ByteBuffer local = getLocalDirectBuffer();
MemoryUtil.duplicateDirectByteBuffer(src, local);
dest.writeBytes(local);
}
else
{
dest.writeBytes(src.duplicate());
}
}

public static int sizeOfValue(byte[] bytes)
Expand Down Expand Up @@ -614,4 +651,8 @@ private static byte[] readRawBytes(ByteBuf cb, int length)
return bytes;
}

private static ByteBuffer getLocalDirectBuffer()
{
return localDirectBuffer.get();
}
}
58 changes: 58 additions & 0 deletions test/unit/org/apache/cassandra/transport/WriteBytesTest.java
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.transport;

import org.junit.Test;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.Generators;
import org.assertj.core.api.Assertions;

import static org.quicktheories.QuickTheory.qt;


public class WriteBytesTest
{
@Test
public void test()
{
int maxBytes = 10_000;
ByteBuf buf = Unpooled.buffer(maxBytes);
qt().forAll(Generators.bytesAnyType(0, maxBytes)).checkAssert(bb -> {
buf.clear();

int size = bb.remaining();
int pos = bb.position();

CBUtil.addBytes(bb, buf);

// test for consumption
Assertions.assertThat(bb.remaining()).isEqualTo(size);
Assertions.assertThat(bb.position()).isEqualTo(pos);

Assertions.assertThat(buf.writerIndex()).isEqualTo(size);
for (int i = 0; i < size; i++)
Assertions.assertThat(buf.getByte(buf.readerIndex() + i)).describedAs("byte mismatch at index %d", i).isEqualTo(bb.get(bb.position() + i));
FileUtils.clean(bb);
});
}

}
34 changes: 32 additions & 2 deletions test/unit/org/apache/cassandra/utils/Generators.java
Expand Up @@ -299,6 +299,16 @@ private static char[] createDNSDomainPartDomain()
}

public static Gen<ByteBuffer> bytes(int min, int max)
{
return bytes(min, max, SourceDSL.arbitrary().constant(BBCases.HEAP));
}

public static Gen<ByteBuffer> bytesAnyType(int min, int max)
{
return bytes(min, max, SourceDSL.arbitrary().enumValues(BBCases.class));
}

private static Gen<ByteBuffer> bytes(int min, int max, Gen<BBCases> cases)
{
if (min < 0)
throw new IllegalArgumentException("Asked for negative bytes; given " + min);
Expand All @@ -314,11 +324,31 @@ public static Gen<ByteBuffer> bytes(int min, int max)
// to add more randomness, also shift offset in the array so the same size doesn't yield the same bytes
int offset = (int) rnd.next(Constraint.between(0, MAX_BLOB_LENGTH - size));

return ByteBuffer.wrap(LazySharedBlob.SHARED_BYTES, offset, size);
return handleCases(cases, rnd, offset, size);
};
};

private enum BBCases { HEAP, READ_ONLY_HEAP, DIRECT, READ_ONLY_DIRECT }

private static ByteBuffer handleCases(Gen<BBCases> cases, RandomnessSource rnd, int offset, int size) {
switch (cases.generate(rnd))
{
case HEAP: return ByteBuffer.wrap(LazySharedBlob.SHARED_BYTES, offset, size);
case READ_ONLY_HEAP: return ByteBuffer.wrap(LazySharedBlob.SHARED_BYTES, offset, size).asReadOnlyBuffer();
case DIRECT: return directBufferFromSharedBlob(offset, size);
case READ_ONLY_DIRECT: return directBufferFromSharedBlob(offset, size).asReadOnlyBuffer();
default: throw new AssertionError("can't wait for jdk 17!");
}
}

/**
private static ByteBuffer directBufferFromSharedBlob(int offset, int size) {
ByteBuffer bb = ByteBuffer.allocateDirect(size);
bb.put(LazySharedBlob.SHARED_BYTES, offset, size);
bb.flip();
return bb;
}

/**
* Implements a valid utf-8 generator.
*
* Implementation note, currently relies on getBytes to strip out non-valid utf-8 chars, so is slow
Expand Down

0 comments on commit 35f8da6

Please sign in to comment.