Skip to content

Commit

Permalink
Optimize the decoding of generic http2 (#14175)
Browse files Browse the repository at this point in the history
* Optimize the decoding of generic http2

* Decode on close

* Clean up netty residual memory when stream is closed
  • Loading branch information
finefuture committed May 23, 2024
1 parent b5bb044 commit 935849f
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@
*/
package org.apache.dubbo.remoting.http12.message;

import org.apache.dubbo.remoting.http12.CompositeInputStream;
import org.apache.dubbo.remoting.http12.exception.DecodeException;

import java.io.IOException;
import java.io.InputStream;

public class NoOpStreamingDecoder implements StreamingDecoder {
public class DefaultStreamingDecoder implements StreamingDecoder {

private FragmentListener listener;
private boolean closed;

protected final CompositeInputStream accumulate = new CompositeInputStream();

protected FragmentListener listener;

@Override
public void request(int numMessages) {
Expand All @@ -31,17 +37,38 @@ public void request(int numMessages) {

@Override
public void decode(InputStream inputStream) throws DecodeException {
listener.onFragmentMessage(inputStream);
if (closed) {
// ignored
return;
}
accumulate.addInputStream(inputStream);
}

@Override
public void close() {
this.listener.onClose();
try {
if (!closed) {
closed = true;
listener.onFragmentMessage(accumulate);
accumulate.close();
listener.onClose();
}
} catch (IOException e) {
throw new DecodeException(e);
}
}

@Override
public void onStreamClosed() {
// do nothing
if (closed) {
return;
}
closed = true;
try {
accumulate.close();
} catch (IOException e) {
throw new DecodeException(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.dubbo.remoting.http12.exception.UnimplementedException;
import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
import org.apache.dubbo.remoting.http12.h2.Http2Header;
import org.apache.dubbo.remoting.http12.h2.Http2InputMessage;
import org.apache.dubbo.remoting.http12.h2.Http2TransportListener;
import org.apache.dubbo.remoting.http12.message.MethodMetadata;
import org.apache.dubbo.remoting.http12.message.StreamingDecoder;
Expand Down Expand Up @@ -123,19 +122,6 @@ protected RpcInvocation onBuildRpcInvocationCompletion(RpcInvocation invocation)
return invocation;
}

@Override
protected void onError(Http2InputMessage message, Throwable throwable) {
try {
message.close();
} catch (Exception e) {
throwable.addSuppressed(e);
}
onError(throwable);
}

@Override
protected void onFinally(Http2InputMessage message) {}

@Override
protected GrpcStreamingDecoder getStreamingDecoder() {
return (GrpcStreamingDecoder) super.getStreamingDecoder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import org.apache.dubbo.remoting.http12.h2.Http2ServerChannelObserver;
import org.apache.dubbo.remoting.http12.h2.Http2TransportListener;
import org.apache.dubbo.remoting.http12.message.DefaultListeningDecoder;
import org.apache.dubbo.remoting.http12.message.DefaultStreamingDecoder;
import org.apache.dubbo.remoting.http12.message.ListeningDecoder;
import org.apache.dubbo.remoting.http12.message.MethodMetadata;
import org.apache.dubbo.remoting.http12.message.NoOpStreamingDecoder;
import org.apache.dubbo.remoting.http12.message.StreamingDecoder;
import org.apache.dubbo.remoting.http12.message.codec.JsonCodec;
import org.apache.dubbo.rpc.CancellationContext;
Expand Down Expand Up @@ -77,8 +77,7 @@ public GenericHttp2ServerTransportListener(
}

protected StreamingDecoder newStreamingDecoder() {
// default no op
return new NoOpStreamingDecoder();
return new DefaultStreamingDecoder();
}

@Override
Expand Down Expand Up @@ -174,6 +173,19 @@ protected void onError(Throwable throwable) {
serverChannelObserver.onError(throwable);
}

@Override
protected void onError(Http2InputMessage message, Throwable throwable) {
try {
message.close();
} catch (Exception e) {
throwable.addSuppressed(e);
}
onError(throwable);
}

@Override
protected void onFinally(Http2InputMessage message) {}

@Override
public void cancelByRemote(long errorCode) {
serverChannelObserver.cancel(CancelStreamException.fromRemote(errorCode));
Expand Down

0 comments on commit 935849f

Please sign in to comment.