From 50648b89571ca536009bda5a937da30d524a1e0b Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sun, 26 Mar 2017 03:35:33 +0100 Subject: [PATCH 1/3] Use method handles instead of reflection for LZ4 and Snappy output streams --- .../kafka/common/record/CompressionType.java | 94 +++++++------------ 1 file changed, 33 insertions(+), 61 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index 093d5b3f6d5f5..1d0090ad7c9b0 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -22,7 +22,9 @@ import java.io.InputStream; import java.io.OutputStream; -import java.lang.reflect.Constructor; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -66,8 +68,8 @@ public InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersio @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) { try { - return (OutputStream) SNAPPY_OUTPUT_STREAM_SUPPLIER.get().newInstance(buffer, bufferSize); - } catch (Exception e) { + return (OutputStream) SnappyConstructors.OUTPUT.invoke(buffer, bufferSize); + } catch (Throwable e) { throw new KafkaException(e); } } @@ -75,8 +77,8 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer @Override public InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion) { try { - return (InputStream) SNAPPY_INPUT_STREAM_SUPPLIER.get().newInstance(buffer); - } catch (Exception e) { + return (InputStream) SnappyConstructors.INPUT.invoke(buffer); + } catch (Throwable e) { throw new KafkaException(e); } } @@ -86,9 +88,9 @@ public InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersio @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) { try { - return (OutputStream) LZ4_OUTPUT_STREAM_SUPPLIER.get().newInstance(buffer, + return (OutputStream) LZ4Constructors.OUTPUT.invoke(buffer, messageVersion == RecordBatch.MAGIC_VALUE_V0); - } catch (Exception e) { + } catch (Throwable e) { throw new KafkaException(e); } } @@ -96,9 +98,9 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer @Override public InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion) { try { - return (InputStream) LZ4_INPUT_STREAM_SUPPLIER.get().newInstance(buffer, + return (InputStream) LZ4Constructors.INPUT.invoke(buffer, messageVersion == RecordBatch.MAGIC_VALUE_V0); - } catch (Exception e) { + } catch (Throwable e) { throw new KafkaException(e); } } @@ -146,64 +148,34 @@ else if (LZ4.name.equals(name)) throw new IllegalArgumentException("Unknown compression name: " + name); } - // dynamically load the snappy and lz4 classes to avoid runtime dependency if we are not using compression - // caching constructors to avoid invoking of Class.forName method for each batch - private static final MemoizingConstructorSupplier SNAPPY_OUTPUT_STREAM_SUPPLIER = new MemoizingConstructorSupplier(new ConstructorSupplier() { - @Override - public Constructor get() throws ClassNotFoundException, NoSuchMethodException { - return Class.forName("org.xerial.snappy.SnappyOutputStream") - .getConstructor(OutputStream.class, Integer.TYPE); - } - }); + // Dynamically load the Snappy and LZ4 classes so that we only have a runtime dependency on compression algorithms + // that are being used. This is important for platforms that are not supported by the underlying libraries. + // Note that we are using the initialization-on-demand holder idiom, so it's important that the initialisation + // is done in separate classes (one per compression type). - private static final MemoizingConstructorSupplier LZ4_OUTPUT_STREAM_SUPPLIER = new MemoizingConstructorSupplier(new ConstructorSupplier() { - @Override - public Constructor get() throws ClassNotFoundException, NoSuchMethodException { - return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockOutputStream") - .getConstructor(OutputStream.class, Boolean.TYPE); - } - }); + private static class LZ4Constructors { + static final MethodHandle INPUT = findConstructor( + "org.apache.kafka.common.record.KafkaLZ4BlockInputStream", + MethodType.methodType(void.class, InputStream.class, Boolean.TYPE)); - private static final MemoizingConstructorSupplier SNAPPY_INPUT_STREAM_SUPPLIER = new MemoizingConstructorSupplier(new ConstructorSupplier() { - @Override - public Constructor get() throws ClassNotFoundException, NoSuchMethodException { - return Class.forName("org.xerial.snappy.SnappyInputStream") - .getConstructor(InputStream.class); - } - }); - - private static final MemoizingConstructorSupplier LZ4_INPUT_STREAM_SUPPLIER = new MemoizingConstructorSupplier(new ConstructorSupplier() { - @Override - public Constructor get() throws ClassNotFoundException, NoSuchMethodException { - return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream") - .getConstructor(InputStream.class, Boolean.TYPE); - } - }); + static final MethodHandle OUTPUT = findConstructor( + "org.apache.kafka.common.record.KafkaLZ4BlockOutputStream", + MethodType.methodType(void.class, OutputStream.class, Boolean.TYPE)); - private interface ConstructorSupplier { - Constructor get() throws ClassNotFoundException, NoSuchMethodException; } - // this code is based on Guava's @see{com.google.common.base.Suppliers.MemoizingSupplier} - private static class MemoizingConstructorSupplier { - final ConstructorSupplier delegate; - transient volatile boolean initialized; - transient Constructor value; - - public MemoizingConstructorSupplier(ConstructorSupplier delegate) { - this.delegate = delegate; - } + private static class SnappyConstructors { + static final MethodHandle INPUT = findConstructor("org.xerial.snappy.SnappyInputStream", + MethodType.methodType(void.class, InputStream.class)); + static final MethodHandle OUTPUT = findConstructor("org.xerial.snappy.SnappyOutputStream", + MethodType.methodType(void.class, OutputStream.class, Integer.TYPE)); + } - public Constructor get() throws NoSuchMethodException, ClassNotFoundException { - if (!initialized) { - synchronized (this) { - if (!initialized) { - value = delegate.get(); - initialized = true; - } - } - } - return value; + private static MethodHandle findConstructor(String className, MethodType methodType) { + try { + return MethodHandles.publicLookup().findConstructor(Class.forName(className), methodType); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); } } From e776f562f5c97afd6b42fb30a1743f0b1cce9aef Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 27 Mar 2017 09:03:57 +0100 Subject: [PATCH 2/3] Remove unused ByteBufferReceive --- .../common/network/ByteBufferReceive.java | 57 ------------------- 1 file changed, 57 deletions(-) delete mode 100644 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java deleted file mode 100644 index 5b0af02cb6155..0000000000000 --- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.network; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ScatteringByteChannel; - -/** - * A receive backed by an array of ByteBuffers - */ -public class ByteBufferReceive implements Receive { - - private final String source; - private final ByteBuffer[] buffers; - private int remaining; - - public ByteBufferReceive(String source, ByteBuffer... buffers) { - super(); - this.source = source; - this.buffers = buffers; - for (ByteBuffer buffer : buffers) - remaining += buffer.remaining(); - } - - @Override - public String source() { - return source; - } - - @Override - public boolean complete() { - return remaining > 0; - } - - @Override - public long readFrom(ScatteringByteChannel channel) throws IOException { - long read = channel.read(buffers); - remaining += read; - return read; - } - -} From 3eeaa189186b21f038e0c9cc3dd8e0cc80da7f96 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 27 Mar 2017 09:05:09 +0100 Subject: [PATCH 3/3] Minor wording improvement --- .../java/org/apache/kafka/common/record/CompressionType.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index 1d0090ad7c9b0..a78c5a2ad5053 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -149,7 +149,7 @@ else if (LZ4.name.equals(name)) } // Dynamically load the Snappy and LZ4 classes so that we only have a runtime dependency on compression algorithms - // that are being used. This is important for platforms that are not supported by the underlying libraries. + // that are used. This is important for platforms that are not supported by the underlying libraries. // Note that we are using the initialization-on-demand holder idiom, so it's important that the initialisation // is done in separate classes (one per compression type).