Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

memory leak of ReceivePort when using create_stream #1836

Closed
uditrugman opened this issue Mar 26, 2024 · 7 comments · Fixed by #1857
Closed

memory leak of ReceivePort when using create_stream #1836

uditrugman opened this issue Mar 26, 2024 · 7 comments · Fixed by #1857
Labels
awaiting Waiting for responses, PR, further discussions, upstream release, etc bug Something isn't working

Comments

@uditrugman
Copy link

uditrugman commented Mar 26, 2024

Describe the bug

the current implementation closes the receive port when a "done" is received on the stream, but if the dart side cancelled the subscription before receiving "done", then the receive port will remain open on the dart side. Then, even if closing the stream, the receive port will never be released.

an alternative would be the close outside the for loop, but that would cause another issue.
if a client (dart side) closes the port, there could be a race where rust can still send through the sendPort and that would lead to an error.

see the code below.

  Stream<S> _executeStreamInner<S, E extends Object>(
      StreamTask<S, E>? task) async* {
    final portName =
        ExecuteStreamPortGenerator.create(task!.constMeta.debugName);
    final receivePort = broadcastPort(portName);

    task.callFfi(receivePort.sendPort.nativePort);

    final codec = task.codec;
    task = null;

    await for (final raw in receivePort) {
      try {
        yield codec.decodeObject(raw);
      } on CloseStreamException {

        // BUG: we only close when "done" is received.
        receivePort.close();
        break;
      }
    }
  }

Steps to reproduce

// the below code demonstrates how the leak can happen. all is needed to cancel the subscription and the receive port will remain open

  print("creating");
  final s = createLogStream();
  print("listening");
  
  final subscription = s.listen((event) {
    print("event: ${event.msg}");
  }, onDone: () {
    print("done");
  });

  print("sending 1");
  sendEvent(msg: "msg 1");
  print("sending 2");
  sendEvent(msg: "msg 2");
  print("done");

  await Future<void>.delayed(const Duration(seconds: 1));
  
  // the cancel below causes the for loop to end without receiving the "done" event on the stream
  subscription.cancel();

Logs

there are no logs.

Expected behavior

No response

Generated binding code

No response

OS

No response

Version of flutter_rust_bridge_codegen

No response

Flutter info

No response

Version of clang++

No response

Additional context

No response

@uditrugman uditrugman added the bug Something isn't working label Mar 26, 2024
@fzyzcjy fzyzcjy added the awaiting Waiting for responses, PR, further discussions, upstream release, etc label Mar 26, 2024
@fzyzcjy
Copy link
Owner

fzyzcjy commented Mar 26, 2024

Good observation! What about we close the port using a finally block outside the for (instead of the catch block inside for)?

Looking forward to your PR (especially given that you have already had thorough understanding about the problem)! Alternative, I will fix it in the next batch, probably within a week (but this is a good-first-issue).

@uditrugman
Copy link
Author

i commented on that as well. it's not valid because then the receive port will be closed while the rust side "sendPort" is still open and rust might send events.
there is a race condition here that unless the dart side knows how to "close" the stream, then the two sides are not in sync.

I'd suggest that the generated code will know how to close the stream and when a client cancells subscription, the streams should be closed as well

@uditrugman
Copy link
Author

uditrugman commented Mar 26, 2024

there is another issue with the way listen is implemented. the use of "async*" is not optimal because the actual call to rust's create_stream is asynchronous and that might cause issues. here is another example:

  final subscription = s.listen((event) {
    print("event: ${event.msg}");
  }, onDone: () {
    print("done");
  });

  print("sending 1");
  sendEvent(msg: "msg 1");

let's assume that "sendEvent" calls rust which then sends an event.
the above code will fail because "listen" will call rust only in the next dart "tick".

a better implementation would be to call rust synchronously, during the listen method and that would make the stream ready on both sides

@fzyzcjy
Copy link
Owner

fzyzcjy commented Mar 27, 2024

Get it. I will take a look at it a bit deeper hopefully within 24 hours and reply here.

@fzyzcjy
Copy link
Owner

fzyzcjy commented Mar 27, 2024

Disclaimer: Too tired today, all words below may be very silly :/

it's not valid because then the receive port will be closed while the rust side "sendPort" is still open and rust might send events.

Then if rust side sends events, it just provide errors to the rust function (if it silently swallow it then it is definitely bad!). Maybe this is good behavior?

there is another issue with the way listen is implemented

Good observation! I guess one simple way may be to move the parts that we want to execute synchronously outside of the async* function.

@fzyzcjy
Copy link
Owner

fzyzcjy commented Apr 8, 2024

Related: #1867

Copy link
Contributor

This thread has been automatically locked since there has not been any recent activity after it was closed. If you are still experiencing a similar issue, please open a new issue.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Apr 22, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
awaiting Waiting for responses, PR, further discussions, upstream release, etc bug Something isn't working
Projects
None yet
2 participants