From 935849f4ce0eed1893869711fe42b84d22e31c84 Mon Sep 17 00:00:00 2001 From: TomlongTK Date: Thu, 23 May 2024 11:17:31 +0800 Subject: [PATCH] Optimize the decoding of generic http2 (#14175) * Optimize the decoding of generic http2 * Decode on close * Clean up netty residual memory when stream is closed --- ...oder.java => DefaultStreamingDecoder.java} | 37 ++++++++++++++++--- .../GrpcHttp2ServerTransportListener.java | 14 ------- .../GenericHttp2ServerTransportListener.java | 18 +++++++-- 3 files changed, 47 insertions(+), 22 deletions(-) rename dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/{NoOpStreamingDecoder.java => DefaultStreamingDecoder.java} (59%) diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/NoOpStreamingDecoder.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/DefaultStreamingDecoder.java similarity index 59% rename from dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/NoOpStreamingDecoder.java rename to dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/DefaultStreamingDecoder.java index 473e6a2ed0f..939bbefc744 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/NoOpStreamingDecoder.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/DefaultStreamingDecoder.java @@ -16,13 +16,19 @@ */ package org.apache.dubbo.remoting.http12.message; +import org.apache.dubbo.remoting.http12.CompositeInputStream; import org.apache.dubbo.remoting.http12.exception.DecodeException; +import java.io.IOException; import java.io.InputStream; -public class NoOpStreamingDecoder implements StreamingDecoder { +public class DefaultStreamingDecoder implements StreamingDecoder { - private FragmentListener listener; + private boolean closed; + + protected final CompositeInputStream accumulate = new CompositeInputStream(); + + protected FragmentListener listener; @Override public void request(int numMessages) { @@ -31,17 +37,38 @@ public void request(int numMessages) { @Override public void decode(InputStream inputStream) throws DecodeException { - listener.onFragmentMessage(inputStream); + if (closed) { + // ignored + return; + } + accumulate.addInputStream(inputStream); } @Override public void close() { - this.listener.onClose(); + try { + if (!closed) { + closed = true; + listener.onFragmentMessage(accumulate); + accumulate.close(); + listener.onClose(); + } + } catch (IOException e) { + throw new DecodeException(e); + } } @Override public void onStreamClosed() { - // do nothing + if (closed) { + return; + } + closed = true; + try { + accumulate.close(); + } catch (IOException e) { + throw new DecodeException(e); + } } @Override diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java index 0399c643342..9ae78813025 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java @@ -26,7 +26,6 @@ import org.apache.dubbo.remoting.http12.exception.UnimplementedException; import org.apache.dubbo.remoting.http12.h2.H2StreamChannel; import org.apache.dubbo.remoting.http12.h2.Http2Header; -import org.apache.dubbo.remoting.http12.h2.Http2InputMessage; import org.apache.dubbo.remoting.http12.h2.Http2TransportListener; import org.apache.dubbo.remoting.http12.message.MethodMetadata; import org.apache.dubbo.remoting.http12.message.StreamingDecoder; @@ -123,19 +122,6 @@ protected RpcInvocation onBuildRpcInvocationCompletion(RpcInvocation invocation) return invocation; } - @Override - protected void onError(Http2InputMessage message, Throwable throwable) { - try { - message.close(); - } catch (Exception e) { - throwable.addSuppressed(e); - } - onError(throwable); - } - - @Override - protected void onFinally(Http2InputMessage message) {} - @Override protected GrpcStreamingDecoder getStreamingDecoder() { return (GrpcStreamingDecoder) super.getStreamingDecoder(); diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java index 5be8bfcf68a..9db29718376 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java @@ -28,9 +28,9 @@ import org.apache.dubbo.remoting.http12.h2.Http2ServerChannelObserver; import org.apache.dubbo.remoting.http12.h2.Http2TransportListener; import org.apache.dubbo.remoting.http12.message.DefaultListeningDecoder; +import org.apache.dubbo.remoting.http12.message.DefaultStreamingDecoder; import org.apache.dubbo.remoting.http12.message.ListeningDecoder; import org.apache.dubbo.remoting.http12.message.MethodMetadata; -import org.apache.dubbo.remoting.http12.message.NoOpStreamingDecoder; import org.apache.dubbo.remoting.http12.message.StreamingDecoder; import org.apache.dubbo.remoting.http12.message.codec.JsonCodec; import org.apache.dubbo.rpc.CancellationContext; @@ -77,8 +77,7 @@ public GenericHttp2ServerTransportListener( } protected StreamingDecoder newStreamingDecoder() { - // default no op - return new NoOpStreamingDecoder(); + return new DefaultStreamingDecoder(); } @Override @@ -174,6 +173,19 @@ protected void onError(Throwable throwable) { serverChannelObserver.onError(throwable); } + @Override + protected void onError(Http2InputMessage message, Throwable throwable) { + try { + message.close(); + } catch (Exception e) { + throwable.addSuppressed(e); + } + onError(throwable); + } + + @Override + protected void onFinally(Http2InputMessage message) {} + @Override public void cancelByRemote(long errorCode) { serverChannelObserver.cancel(CancelStreamException.fromRemote(errorCode));