Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hdds.utils.db;

import javax.annotation.Nonnull;
import java.util.function.IntFunction;

/**
* Codec to serialize/deserialize {@link Boolean}.
Expand All @@ -44,7 +43,7 @@ public boolean supportCodecBuffer() {

@Override
public CodecBuffer toCodecBuffer(Boolean object,
IntFunction<CodecBuffer> allocator) {
CodecBuffer.Allocator allocator) {
return allocator.apply(1).put(TRUE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.function.IntFunction;

/**
* Codec interface to serialize/deserialize objects to/from bytes.
Expand All @@ -34,7 +33,7 @@ public interface Codec<T> {
/**
* Does this {@link Codec} support the {@link CodecBuffer} methods?
* If this method returns true, this class must implement both
* {@link #toCodecBuffer(Object, IntFunction)} and
* {@link #toCodecBuffer(Object, CodecBuffer.Allocator)} and
* {@link #fromCodecBuffer(CodecBuffer)}.
*
* @return ture iff this class supports the {@link CodecBuffer} methods.
Expand All @@ -59,7 +58,7 @@ default int getSerializedSizeUpperBound(T object) {
* @return a buffer storing the serialized bytes.
*/
default CodecBuffer toCodecBuffer(@Nonnull T object,
IntFunction<CodecBuffer> allocator) throws IOException {
CodecBuffer.Allocator allocator) throws IOException {
throw new UnsupportedOperationException();
}

Expand All @@ -71,7 +70,7 @@ default CodecBuffer toCodecBuffer(@Nonnull T object,
*/
default CodecBuffer toDirectCodecBuffer(@Nonnull T object)
throws IOException {
return toCodecBuffer(object, CodecBuffer::allocateDirect);
return toCodecBuffer(object, CodecBuffer.Allocator.getDirect());
}

/**
Expand All @@ -82,7 +81,7 @@ default CodecBuffer toDirectCodecBuffer(@Nonnull T object)
*/
default CodecBuffer toHeapCodecBuffer(@Nonnull T object)
throws IOException {
return toCodecBuffer(object, CodecBuffer::allocateHeap);
return toCodecBuffer(object, CodecBuffer.Allocator.getHeap());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
*/
package org.apache.hadoop.hdds.utils.db;

import com.google.protobuf.ByteString;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufAllocator;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufInputStream;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufOutputStream;
import org.apache.ratis.thirdparty.io.netty.buffer.EmptyByteBuf;
import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
import org.apache.ratis.util.MemoizedSupplier;
Expand All @@ -37,7 +39,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.BiFunction;
import java.util.function.IntFunction;
import java.util.function.ToIntFunction;

Expand All @@ -53,22 +55,26 @@ public class CodecBuffer implements AutoCloseable {

/** To create {@link CodecBuffer} instances. */
private static class Factory {
private static volatile Function<ByteBuf, CodecBuffer> constructor
private static volatile BiFunction<ByteBuf, Object, CodecBuffer> constructor
= CodecBuffer::new;
static void set(Function<ByteBuf, CodecBuffer> f) {
static void set(BiFunction<ByteBuf, Object, CodecBuffer> f) {
constructor = f;
LOG.info("Successfully set constructor to " + f);
}

static CodecBuffer newCodecBuffer(ByteBuf buf) {
return constructor.apply(buf);
return newCodecBuffer(buf, null);
}

static CodecBuffer newCodecBuffer(ByteBuf buf, Object wrapped) {
return constructor.apply(buf, wrapped);
}
}

/** To detect buffer leak. */
private static class LeakDetector {
static CodecBuffer newCodecBuffer(ByteBuf buf) {
return new CodecBuffer(buf) {
static CodecBuffer newCodecBuffer(ByteBuf buf, Object wrapped) {
return new CodecBuffer(buf, wrapped) {
@Override
protected void finalize() {
detectLeaks();
Expand Down Expand Up @@ -126,6 +132,51 @@ public void increase(int required) {
? POOL.heapBuffer(c, c) // allocate exact size
: POOL.heapBuffer(-c); // allocate a resizable buffer

private static final CodecBuffer EMPTY_BUFFER = new CodecBuffer(
new EmptyByteBuf(POOL), null);

public static CodecBuffer getEmptyBuffer() {
return EMPTY_BUFFER;
}

/** To allocate {@link CodecBuffer} objects. */
public interface Allocator extends IntFunction<CodecBuffer> {
Allocator DIRECT = new Allocator() {
@Override
public CodecBuffer apply(int capacity) {
return allocate(capacity, POOL_DIRECT);
}

@Override
public boolean isDirect() {
return true;
}
};

static Allocator getDirect() {
return DIRECT;
}

Allocator HEAP = new Allocator() {
@Override
public CodecBuffer apply(int capacity) {
return allocate(capacity, POOL_HEAP);
}

@Override
public boolean isDirect() {
return false;
}
};

static Allocator getHeap() {
return HEAP;
}

/** Does this object allocate direct buffers? */
boolean isDirect();
}

private final StackTraceElement[] elements;

/**
Expand Down Expand Up @@ -162,7 +213,13 @@ public static CodecBuffer allocateHeap(int capacity) {

/** Wrap the given array. */
public static CodecBuffer wrap(byte[] array) {
return Factory.newCodecBuffer(Unpooled.wrappedBuffer(array));
return Factory.newCodecBuffer(Unpooled.wrappedBuffer(array), array);
}

/** Wrap the given {@link ByteString}. */
public static CodecBuffer wrap(ByteString bytes) {
return Factory.newCodecBuffer(
Unpooled.wrappedBuffer(bytes.asReadOnlyByteBuffer()), bytes);
}

private static final AtomicInteger LEAK_COUNT = new AtomicInteger();
Expand All @@ -176,14 +233,28 @@ public static void assertNoLeaks() {
}

private final ByteBuf buf;
private final Object wrapped;
private final CompletableFuture<Void> released = new CompletableFuture<>();

private CodecBuffer(ByteBuf buf) {
private CodecBuffer(ByteBuf buf, Object wrapped) {
this.buf = buf;
this.wrapped = wrapped;
this.elements = getStackTrace(LOG);
assertRefCnt(1);
}

public boolean isDirect() {
return buf.isDirect();
}

/**
* @return the wrapped object if this buffer is created by wrapping it;
* otherwise, return null.
*/
public Object getWrapped() {
return wrapped;
}

private void assertRefCnt(int expected) {
Preconditions.assertSame(expected, buf.refCnt(), "refCnt");
}
Expand Down Expand Up @@ -222,7 +293,10 @@ public void close() {
/** Release this buffer and return it back to the pool. */
public void release() {
final boolean set = released.complete(null);
Preconditions.assertTrue(set, () -> "Already released: " + this);
if (!set) {
// Allow a zero capacity buffer to be released multiple times.
Preconditions.assertSame(0, buf.capacity(), "capacity");
}
if (buf.release()) {
assertRefCnt(0);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.function.IntFunction;

/**
* A {@link Codec} to serialize/deserialize objects by delegation.
Expand Down Expand Up @@ -82,7 +81,7 @@ public final boolean supportCodecBuffer() {

@Override
public final CodecBuffer toCodecBuffer(@Nonnull T message,
IntFunction<CodecBuffer> allocator) throws IOException {
CodecBuffer.Allocator allocator) throws IOException {
return delegate.toCodecBuffer(backward.apply(message), allocator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.function.IntFunction;

/**
* Codec to serialize/deserialize {@link Integer}.
Expand All @@ -44,7 +43,7 @@ public boolean supportCodecBuffer() {

@Override
public CodecBuffer toCodecBuffer(@Nonnull Integer object,
IntFunction<CodecBuffer> allocator) {
CodecBuffer.Allocator allocator) {
return allocator.apply(Integer.BYTES).putInt(object);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.function.IntFunction;

/**
* Codec to serialize/deserialize {@link Long}.
Expand All @@ -41,7 +40,7 @@ public boolean supportCodecBuffer() {

@Override
public CodecBuffer toCodecBuffer(@Nonnull Long object,
IntFunction<CodecBuffer> allocator) {
CodecBuffer.Allocator allocator) {
return allocator.apply(Long.BYTES).putLong(object);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.io.OutputStream;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.IntFunction;

/**
* Codecs to serialize/deserialize Protobuf v2 messages.
Expand Down Expand Up @@ -70,7 +69,7 @@ public boolean supportCodecBuffer() {

@Override
public CodecBuffer toCodecBuffer(@Nonnull M message,
IntFunction<CodecBuffer> allocator) throws IOException {
CodecBuffer.Allocator allocator) throws IOException {
final int size = message.getSerializedSize();
return allocator.apply(size).put(writeTo(message, size));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.IntFunction;
import java.util.function.ToIntFunction;

/**
Expand Down Expand Up @@ -82,7 +81,7 @@ private ToIntFunction<ByteBuffer> writeTo(M message, int size) {

@Override
public CodecBuffer toCodecBuffer(@Nonnull M message,
IntFunction<CodecBuffer> allocator) {
CodecBuffer.Allocator allocator) {
final int size = message.getSerializedSize();
return allocator.apply(size).put(writeTo(message, size));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hadoop.hdds.utils.db;

import java.nio.ByteBuffer;
import java.util.function.IntFunction;

import javax.annotation.Nonnull;

Expand All @@ -45,7 +44,7 @@ public boolean supportCodecBuffer() {

@Override
public CodecBuffer toCodecBuffer(@Nonnull Short object,
IntFunction<CodecBuffer> allocator) {
CodecBuffer.Allocator allocator) {
return allocator.apply(Short.BYTES).putShort(object);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.nio.charset.CodingErrorAction;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.IntFunction;

/**
* An abstract {@link Codec} to serialize/deserialize {@link String}
Expand Down Expand Up @@ -169,7 +168,7 @@ public boolean supportCodecBuffer() {

@Override
public CodecBuffer toCodecBuffer(@Nonnull String object,
IntFunction<CodecBuffer> allocator) throws IOException {
CodecBuffer.Allocator allocator) throws IOException {
// allocate a larger buffer to avoid encoding twice.
final int upperBound = getSerializedSizeUpperBound(object);
final CodecBuffer buffer = allocator.apply(upperBound);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.function.IntFunction;

/**
* Codec to serialize/deserialize {@link UUID}.
Expand All @@ -48,7 +47,7 @@ public boolean supportCodecBuffer() {

@Override
public CodecBuffer toCodecBuffer(@Nonnull UUID id,
IntFunction<CodecBuffer> allocator) {
CodecBuffer.Allocator allocator) {
return allocator.apply(SERIALIZED_SIZE)
.putLong(id.getMostSignificantBits())
.putLong(id.getLeastSignificantBits());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static <T> void runTest(Codec<T> codec, T original,

// serialize to CodecBuffer
final CodecBuffer codecBuffer = codec.toCodecBuffer(
original, CodecBuffer::allocateHeap);
original, CodecBuffer.Allocator.getHeap());
Assertions.assertEquals(array.length, codecBuffer.readableBytes());
final ByteBuffer byteBuffer = codecBuffer.asReadOnlyByteBuffer();
Assertions.assertEquals(array.length, byteBuffer.remaining());
Expand Down
Loading