Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release ByteBuf when handle onData failed #13102

Merged
merged 8 commits into from Oct 10, 2023
Expand Up @@ -60,11 +60,15 @@ public SerializingExecutor(Executor executor) {
*/
@Override
public void execute(Runnable r) {
execute(r, null);
}

public void execute(Runnable r, Runnable cleanIfFailed) {
runQueue.add(r);
schedule(r);
schedule(r, cleanIfFailed);
}

private void schedule(Runnable removable) {
private void schedule(Runnable removable, Runnable cleanIfFailed) {
if (atomicBoolean.compareAndSet(false, true)) {
boolean success = false;
try {
Expand All @@ -86,6 +90,9 @@ private void schedule(Runnable removable) {
// to execute don't succeed and accidentally run a previous runnable.
runQueue.remove(removable);
}
if (cleanIfFailed != null) {
cleanIfFailed.run();
}
atomicBoolean.set(false);
}
}
Expand All @@ -111,7 +118,7 @@ public void run() {
}
if (!runQueue.isEmpty()) {
// we didn't enqueue anything but someone else did.
schedule(null);
schedule(null, null);
}
}
}
Expand Up @@ -28,7 +28,7 @@
*/
public abstract class AbstractStream implements Stream {

protected Executor executor;
protected SerializingExecutor executor;
protected final FrameworkModel frameworkModel;


Expand Down
Expand Up @@ -449,23 +449,25 @@ public void onHeader(Http2Headers headers, boolean endStream) {

@Override
public void onData(ByteBuf data, boolean endStream) {
executor.execute(() -> {
if (transportError != null) {
transportError.appendDescription(
"Data:" + data.toString(StandardCharsets.UTF_8));
ReferenceCountUtil.release(data);
if (transportError.description.length() > 512 || endStream) {
handleH2TransportError(transportError);
}
return;
}
if (!headerReceived) {
handleH2TransportError(TriRpcStatus.INTERNAL.withDescription(
"headers not received before payload"));
return;
executor.execute(() -> doOnData(data, endStream), () -> ReferenceCountUtil.release(data));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But is there anything better here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still using Executor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is SerializingExecutor, not a real Executor, should replace method name?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is that the failed operation can not destroy the serialExecutor interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe i can make CloseableRunnable, if execute failed, clear resource

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe,go ahead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resubmitted, please review.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to reuse jdk's closeable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
It would throw a IOException, and SerializingExecutor need handle it, but SerializingExecutor shouldn't do the job

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.fine

}

private void doOnData(ByteBuf data, boolean endStream) {
if (transportError != null) {
transportError.appendDescription(
"Data:" + data.toString(StandardCharsets.UTF_8));
ReferenceCountUtil.release(data);
if (transportError.description.length() > 512 || endStream) {
handleH2TransportError(transportError);
}
deframer.deframe(data);
});
return;
}
if (!headerReceived) {
handleH2TransportError(TriRpcStatus.INTERNAL.withDescription(
"headers not received before payload"));
return;
}
deframer.deframe(data);
}

@Override
Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.apache.dubbo.rpc.protocol.tri.stream;

import io.netty.util.ReferenceCountUtil;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.logger.Logger;
Expand Down Expand Up @@ -431,7 +432,7 @@ private void processHeader(Http2Headers headers, boolean endStream) {

@Override
public void onData(ByteBuf data, boolean endStream) {
executor.execute(() -> doOnData(data, endStream));
executor.execute(() -> doOnData(data, endStream), () -> ReferenceCountUtil.release(data));
}

private void doOnData(ByteBuf data, boolean endStream) {
Expand Down