Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WorkerException: type '_InternalLinkedHashMap<String, dynamic>' is not a subtype of type 'SendPort?' #3

Closed
SaadArdati opened this issue Jan 31, 2022 · 28 comments

Comments

@SaadArdati
Copy link

Trying my first run converting a complex system to use isolates/workers, i get the exception in the title.

flutter: type '_InternalLinkedHashMap<String, dynamic>' is not a subtype of type 'SendPort?'
flutter: #0      openChannel (package:squadron/src/native/channel.dart:103:11)
<asynchronous suspension>
d-markey/squadron_sample#1      Worker.send (package:squadron/src/worker.dart:107:9)
<asynchronous suspension>
d-markey/squadron#3      WorkerTask._runFuture (package:squadron/src/worker_task.dart:148:21)
<asynchronous suspension>

I replicated the thumbnail example as far as I can tell.

Here's my code:

solver_service.dart

import 'dart:async';

import 'package:squadron/squadron.dart';

import '../position_manager_thread.dart';

abstract class SolverService {
  FutureOr<Map<String, dynamic>> initRoot(Map<String, dynamic> rootNodeJson);

  FutureOr<Map<String, dynamic>> performLayout(
    Set<Map<String, dynamic>> nodesReference,
    Set<String> createdNodes,
    Set<String> removedNodes,
    Set<String> adoptedNodes,
    Set<String> droppedNodes,
    Set<String> changedNodes,
  );

  static const cmdPerformLayout = 1;
  static const cmdInitRoot = 2;
}

class SolverServiceImpl implements SolverService, WorkerService {
  final PositionManagerThread positionManager =
      PositionManagerThread(notifier: (pos) => print(pos));

  @override
  FutureOr<Map<String, dynamic>> initRoot(Map<String, dynamic> rootNodeJson) =>
      positionManager.initRoot(rootNodeJson).toJson();

  @override
  FutureOr<Map<String, dynamic>> performLayout(
    Set<Map<String, dynamic>> nodesReference,
    Set<String> createdNodes,
    Set<String> removedNodes,
    Set<String> adoptedNodes,
    Set<String> droppedNodes,
    Set<String> changedNodes,
  ) async {
    return positionManager
        .performLayout(
          nodesReference,
          createdNodes,
          removedNodes,
          adoptedNodes,
          droppedNodes,
          changedNodes,
        )
        .toJson();
  }

  @override
  late final Map<int, CommandHandler> operations = {
    SolverService.cmdPerformLayout: (WorkerRequest r) => performLayout(
          r.args[0],
          r.args[1],
          r.args[2],
          r.args[3],
          r.args[4],
          r.args[5],
        ),
    SolverService.cmdInitRoot: (WorkerRequest r) => initRoot(
          r.args[0],
        ),
  };
}

solver_worker_activator.dart:

// stub file, the file used at runtime will depend on the target platform
//   * cf "browser\solver_worker_activator.dart" for Web workers (browser platform)
//   * cf "vm\solver_worker_activator.dart" for Isolate workers (vm platform)

// of course, if your application does not target both Web and VM platforms,
// you need not define a stub file and can go directly for your target platform

import 'solver_worker_pool.dart' show SolverWorker;

SolverWorker createWorker() =>
    throw UnsupportedError('Not supported on this platform');

solver_worker_pool:

// this is a helper file to expose Squadron workers and worker pools as a SolverService

import 'dart:async';

import 'package:codelessly_flutter/managers/position_manager/multithreading/solver_service.dart';
import 'package:squadron/squadron.dart';

// this is where the stub file comes into action
//
// of course, if your application does not target both Web and VM platforms,
// you need not define a stub file and can go directly for your target platform
import 'solver_worker_activator.dart'
    if (dart.library.js) 'package:codelessly_flutter/managers/position_manager/multithreading/browser/solver_worker_activator.dart'
    if (dart.library.html) 'package:codelessly_flutter/managers/position_manager/multithreading/browser/solver_worker_activator.dart'
    if (dart.library.io) 'package:codelessly_flutter/managers/position_manager/multithreading/vm/solver_worker_activator.dart';

// Implementation of SolverService as a Squadron worker pool
class SolverWorkerPool extends WorkerPool<SolverWorker>
    implements SolverService {
  SolverWorkerPool(ConcurrencySettings concurrencySettings)
      : super(createWorker, concurrencySettings: concurrencySettings);

  @override
  FutureOr<Map<String, dynamic>> initRoot(Map<String, dynamic> rootNodeJson) =>
      execute((w) async => w.initRoot(rootNodeJson));

  @override
  FutureOr<Map<String, dynamic>> performLayout(
    Set<Map<String, dynamic>> nodes,
    Set<String> createdNodes,
    Set<String> removedNodes,
    Set<String> adoptedNodes,
    Set<String> droppedNodes,
    Set<String> changedNodes,
  ) =>
      execute((w) async => w.performLayout(
            nodes,
            createdNodes,
            removedNodes,
            adoptedNodes,
            droppedNodes,
            changedNodes,
          ));
}

// Implementation of SolverService as a Squadron worker
class SolverWorker extends Worker implements SolverService {
  SolverWorker(dynamic entryPoint, {String? id, List args = const []})
      : super(entryPoint, id: id, args: args);

  @override
  FutureOr<Map<String, dynamic>> initRoot(Map<String, dynamic> rootNodeJson) {
    return send(
      SolverService.cmdInitRoot,
      [
        rootNodeJson,
      ],
    );
  }

  @override
  FutureOr<Map<String, dynamic>> performLayout(
    Set<Map<String, dynamic>> nodes,
    Set<String> createdNodes,
    Set<String> removedNodes,
    Set<String> adoptedNodes,
    Set<String> droppedNodes,
    Set<String> changedNodes,
  ) {
    return send(
      SolverService.cmdPerformLayout,
      [
        nodes,
        createdNodes,
        removedNodes,
        adoptedNodes,
        droppedNodes,
        changedNodes,
      ],
    );
  }
}

solver_vm.dart:

// Isolate implementation (VM patform)

import 'package:squadron/squadron_service.dart';

import '../solver_service.dart';

void start(Map command) => run((startRequest) => SolverServiceImpl(), command);

solver_worker_activator.dart:

// Creation of the Isolate

import '../solver_worker_pool.dart' show SolverWorker;
import 'solver_worker.dart' as isolate;

SolverWorker createWorker() => SolverWorker(isolate.start);

Usage is quite simple:

  // Crashes if maxParallel is set to 1 for some reason. I only want one instance of this worker/isolate at all times through the life cycle of the app
  final SolverWorkerPool pool = SolverWorkerPool(
      const ConcurrencySettings(minWorkers: 1, maxWorkers: 1, maxParallel: 2));

  Future<void> initRoot(RootNode node) async {
    final LayoutResult result =
        LayoutResult.fromJson(await pool.initRoot(node.toJson()));

I'm not really sure what I'm doing wrong. There's no mention or example that brings up SendPort/ReceiverPort for Squadron.
I get this error for the performLayout() function as well.

I'm running this in a MacOS window. So the isolate only. No javascript

@d-markey
Copy link
Owner

Hello @SwissCheese5,

sorry I missed your post tonight. I'll have a look tomorrow and will keep you posted.

@SaadArdati
Copy link
Author

Thank you :)

@d-markey
Copy link
Owner

d-markey commented Feb 1, 2022

Hello,

I've replicated your code to try and reproduce but found no problem.

I've had to adapt some parts of the code in particular I implemented a dummy PositionManagerThread, so maybe the problem is in there? Also, I had to update solver_worker_activator.dart because the import 'solver_worker.dart' as isolate; looks wrong to me. I replaced it with import 'solver_vm.dart' as isolate;.

FYI here's the sample
squadron_test.zip

You can just dart run it and you should see some output like:

pool with 1 worker, parallelism = 2
[0:00:00.000861]  [0] {initRoot: ok, payload: {task-id: 0}, processed-by: 922557508}
[0:00:00.001891]  [1] {initRoot: ok, payload: {task-id: 1}, processed-by: 922557508}
[0:00:00.002294]  [2] {initRoot: ok, payload: {task-id: 2}, processed-by: 922557508}
[0:00:00.002512]  [3] {initRoot: ok, payload: {task-id: 3}, processed-by: 922557508}
[0:00:00.002723]  [4] {initRoot: ok, payload: {task-id: 4}, processed-by: 922557508}
[0:00:00.002931]  [5] {initRoot: ok, payload: {task-id: 5}, processed-by: 922557508}
[0:00:00.003140]  [6] {initRoot: ok, payload: {task-id: 6}, processed-by: 922557508}
[0:00:00.003346]  [7] {initRoot: ok, payload: {task-id: 7}, processed-by: 922557508}
[0:00:00.003553]  [8] {initRoot: ok, payload: {task-id: 8}, processed-by: 922557508}
[0:00:00.003710]  [9] {initRoot: ok, payload: {task-id: 9}, processed-by: 922557508}
SolverWorker 421263971: IDLE, load = 0 (max = 2, total = 10, errors = 0), uptime = 0:00:00.000000, idle = 0:00:00.000000

pool with 1-3 workers, parallelism = 2
[0:00:00.001073]  [0] {initRoot: ok, payload: {task-id: 0}, processed-by: 749171621}
[0:00:00.001693]  [1] {initRoot: ok, payload: {task-id: 1}, processed-by: 749171621}
[0:00:00.002258]  [6] {initRoot: ok, payload: {task-id: 6}, processed-by: 749171621}
[0:00:00.003514]  [4] {initRoot: ok, payload: {task-id: 4}, processed-by: 1046298529}
[0:00:00.004199]  [5] {initRoot: ok, payload: {task-id: 5}, processed-by: 1046298529}
[0:00:00.004845]  [7] {initRoot: ok, payload: {task-id: 7}, processed-by: 749171621}
[0:00:00.005250]  [2] {initRoot: ok, payload: {task-id: 2}, processed-by: 541122101}
[0:00:00.005929]  [3] {initRoot: ok, payload: {task-id: 3}, processed-by: 541122101}
[0:00:00.006295]  [8] {initRoot: ok, payload: {task-id: 8}, processed-by: 749171621}
[0:00:00.006792]  [9] {initRoot: ok, payload: {task-id: 9}, processed-by: 1046298529}
SolverWorker 27849473: IDLE, load = 0 (max = 2, total = 3, errors = 0), uptime = 0:00:00.015626, idle = 0:00:00.000000
SolverWorker 659981525: IDLE, load = 0 (max = 2, total = 5, errors = 0), uptime = 0:00:00.015626, idle = 0:00:00.000000
SolverWorker 940802164: IDLE, load = 0 (max = 2, total = 2, errors = 0), uptime = 0:00:00.000000, idle = 0:00:00.000000

single worker
[0:00:00.000679]  [0] {initRoot: ok, payload: {task-id: 0}, processed-by: 872248303}
[0:00:00.001150]  [1] {initRoot: ok, payload: {task-id: 1}, processed-by: 872248303}
[0:00:00.001485]  [2] {initRoot: ok, payload: {task-id: 2}, processed-by: 872248303}
[0:00:00.001919]  [3] {initRoot: ok, payload: {task-id: 3}, processed-by: 872248303}
[0:00:00.002252]  [4] {initRoot: ok, payload: {task-id: 4}, processed-by: 872248303}
[0:00:00.002589]  [5] {initRoot: ok, payload: {task-id: 5}, processed-by: 872248303}
[0:00:00.003082]  [6] {initRoot: ok, payload: {task-id: 6}, processed-by: 872248303}
[0:00:00.003414]  [7] {initRoot: ok, payload: {task-id: 7}, processed-by: 872248303}
[0:00:00.003695]  [8] {initRoot: ok, payload: {task-id: 8}, processed-by: 872248303}
[0:00:00.004135]  [9] {initRoot: ok, payload: {task-id: 9}, processed-by: 872248303}
SolverWorker 592595040: IDLE, load = 0 (max = 10, total = 10, errors = 0), uptime = 0:00:00.000000, idle = 0:00:00.000000

@SaadArdati
Copy link
Author

Thank you, i'll look into this in the morning.

@d-markey
Copy link
Owner

d-markey commented Feb 1, 2022

BTW if you're going to have only one thread, you'd be better off using a standalone SolverWorker. In this case you should not need the pool, so don't bother implementing it.

@SaadArdati
Copy link
Author

Ah, just create a simple and fresh instance of SolverWorkerImpl()? If not, can you please provide a simple example

@d-markey
Copy link
Owner

d-markey commented Feb 1, 2022

It's part of the sample, you will see the last use case uses a worker directly without a pool: final worker = SolverWorker(isolate.start);

If you use an instance of SolverWorkerImpl directly, it will run in your main Isolate but you want it running in a different thread. So you have to wrap it into a Squadron worker.

@SaadArdati
Copy link
Author

Makes sense. I'll try this out soon. By the way I couldn't get the dart js command to work. Are you sure it's still working? If you are, I'll post logs tomorrow.

@d-markey
Copy link
Owner

d-markey commented Feb 1, 2022

Here's a trimmed down implementation if you're positive you don't need JavaScript and don't need to manage a worker pool.

I'll look into the problem regarding maxParallel = 1, thanks for reporting this one.

squadron_test_no_js.zip

@SaadArdati
Copy link
Author

That's the issue, I definitely NEED the javascript. That's why I'm trying to use this package :P

@SaadArdati
Copy link
Author

The need to use this package came from building an app with a very heavy computational process that must run on both desktop and web. So this package seemed perfect as it utilizes isolates on the web. But the dart js command seemed to fail every time. Will post logs later.

@d-markey
Copy link
Owner

d-markey commented Feb 1, 2022

OK, you wrote "Isolate only, no JavaScript" so I thought you wouldn't need it :-P

One important thing regarding Isolates and Web Workers is that data is copied (serialized/deserialized) when it passes from the main thread to the worker thread and back. And usually, the object types will not survive serialization. In particular, a Map<T, K> is likely to be received as a Map<dynamic, dynamic>, and Set<T> may not be supported. Note also that this behavior depends on the platform. Your issue may be caused by that kind of limitation, I've never tested on a MacOS.

For more background details:

You could try to modify the service worker like this to convert Set to List:

class SolverWorker extends Worker implements SolverService {
  SolverWorker(dynamic entryPoint, {String? id, List args = const []})
      : super(entryPoint, id: id, args: args);

  @override
  Future<Map<String, dynamic>> initRoot(Map<String, dynamic> rootNodeJson) {
    return send(
      SolverService.cmdInitRoot,
      [
        rootNodeJson,
      ],
    );
  }

  @override
  Future<Map<String, dynamic>> performLayout(
    Set<Map<String, dynamic>> nodes,
    Set<String> createdNodes,
    Set<String> removedNodes,
    Set<String> adoptedNodes,
    Set<String> droppedNodes,
    Set<String> changedNodes,
  ) {
    return send(
      SolverService.cmdPerformLayout,
      [
        nodes.toList(),
        createdNodes.toList(),
        removedNodes.toList(),
        adoptedNodes.toList(),
        droppedNodes.toList(),
        changedNodes.toList(),
      ],
    );
  }
}

and you will have to do the opposite on the receiving end:

class SolverServiceImpl implements SolverService, WorkerService {
  final PositionManagerThread positionManager =
      PositionManagerThread(notifier: (pos) => print(pos));

  @override
  FutureOr<Map<String, dynamic>> initRoot(Map<String, dynamic> rootNodeJson) =>
      positionManager.initRoot(rootNodeJson);

  @override
  FutureOr<Map<String, dynamic>> performLayout(
    Set<Map<String, dynamic>> nodesReference,
    Set<String> createdNodes,
    Set<String> removedNodes,
    Set<String> adoptedNodes,
    Set<String> droppedNodes,
    Set<String> changedNodes,
  ) async {
    return positionManager.performLayout(
      nodesReference,
      createdNodes,
      removedNodes,
      adoptedNodes,
      droppedNodes,
      changedNodes,
    );
  }


  @override
  late final Map<int, CommandHandler> operations = {
    SolverService.cmdPerformLayout: (WorkerRequest r) => performLayout(
          rebuildSet<Map<String, dynamic>>(r.args[0]),
          rebuildSet<String>(r.args[1]),
          rebuildSet<String>(r.args[2]),
          rebuildSet<String>(r.args[3]),
          rebuildSet<String>(r.args[4]),
          rebuildSet<String>(r.args[5]),
        ),
    SolverService.cmdInitRoot: (WorkerRequest r) => initRoot(
          rebuildMap(r.args[0]),
        ),
  };

  static Set<T> rebuildSet<T>(List items) => items.cast<T>().toSet();
  static Map<String, dynamic> rebuildMap(Map dict) => dict.map((key, value) => MapEntry<String, dynamic>(key, value));
  static Set<Map<String, dynamic>> rebuildMapSet(List<Map> items) => items.map((item) => rebuildMap(item)).cast<Map<String, dynamic>>().toSet();
}

Also check that import 'solver_worker.dart' as isolate; I mentionned, because from the file names you provided, it should really be solver_vm.dart. This is the file implementing the "start()" function for native Isolates, and I don't see any solver_worker.dart file in your original post.

I'll try to build a Flutter sample but it may not come before the week-end!

@SaadArdati
Copy link
Author

Allow me to provide more context:

I followed the thumbnail example, so import 'solver_worker.dart' as isolate; is just following the pattern.

I have two versions of solver_worker and solver_worker_activator for every platform.

image

The activator should activate on that specific platform thanks to this conditional import:
image

Since I need web to work with this as well, I can't really just use solver_vm.

You're right about not needing a pool, but I still need to conditionally import the activator. So I think the proper solution to not use the pools is to do this:

image

instead of:

final worker = SolverWorker(isolate.start);

Finally, I narrowed down the issue more, the one that was throwing:

flutter: type '_InternalLinkedHashMap<String, dynamic>' is not a subtype of type 'SendPort?'

I realized I'd forgotten to even run worker.start(), so I did and bam, I got the error immediately:

flutter: type '_InternalLinkedHashMap<String, dynamic>' is not a subtype of type 'SendPort?'
flutter: #0      openChannel (package:squadron/src/native/channel.dart:103:11)
<asynchronous suspension>
d-markey/squadron_sample#1      PositionManager.initRoot (package:codelessly_flutter/managers/position_manager/position_manager.dart:32:5)
<asynchronous suspension>

with just:

  Future<void> initRoot(RootNode node) async {
    await worker.start();

    final LayoutResult result =
        LayoutResult.fromJson(await worker.initRoot(node.toJson()));
  }

I moved all the code to a new project and it worked just fine. I'm assuming this means that it's crashing because of a conflict with Flutter somehow.

Note that Flutter isolates are instantiated slightly differently than dart isolates. Could that be the cause?

@d-markey
Copy link
Owner

d-markey commented Feb 2, 2022

Calling start() is optional, it will be done when the first task is scheduled. Calling it upfront eg when your app is initializing will avoid the latency of doing it when the first task is queued.

Back to your problem, thanks for the screenshot. You're doing things right AFAICT and will need more time to investigate. If I can make a recommendation though, I would avoid being too strict on types for the service API. Eg avoid Set or Map <String, ...> in favor of the more generic List and Map.

@SaadArdati
Copy link
Author

@d-markey Noted! :)

Eagerly waiting for your investigation results! 🤞

@d-markey
Copy link
Owner

d-markey commented Feb 5, 2022

I believe I have good news, but I still need to wrap things up before releasing a fix. The bottom line is an exception is thrown during the worker initialization. The connection "protocol" in place between the main app and the workers does not play well when that happens, I'm fixing it. That was an interesting one!

The bad news is the exception might come from your code, in which case your app will still crash. The fix will ensure Squadron provides better information to understand the situation, but you will have to figure it out.

Could you try this in solver_worker.dart to find out if it throws from your code:

// Isolate implementation (VM patform)

import 'package:squadron/squadron_service.dart';

import '../solver_service.dart';

void start(Map command) => run((startRequest) {
   try {
      return SolverServiceImpl();
   } catch (ex, st) {
      print(ex);
      print(st);
      rethrow;
   }
}, command);

Remember that Isolates in Flutter cannot use some advanced features, eg. sending platform-channel messages (see https://docs.flutter.dev/development/platform-integration/platform-channels#channels-and-platform-threading)

@SaadArdati
Copy link
Author

The cool thing about my use case is that i don't want complex data, only lists of strings and nothing more. i highly doubt my data is the problem.

I will try your code soon!!

@SaadArdati
Copy link
Author

I'm getting normal and absolutely valid errors now concerning my normal logic!! I'll keep fixing them and then come back :) It's actually throwing normal errors now.

@d-markey
Copy link
Owner

d-markey commented Feb 5, 2022 via email

@SaadArdati
Copy link
Author

Right now, I'm facing an issue where the dart js command will throw an exception on every single object that relies on a flutter import

Screen.Recording.2022-02-05.at.12.51.15.PM.mp4

I'm assuming this is because I'm using flutter imports, and by substitution, dart:ui imports in the isolate logic. I'm accessing classes like Size/Offset/Axis/Rect, nothing more, which are unfortunately inside dart:ui

It's weird because the isolate runs just fine with them on vm, but compiling it to js pretty much fails. What's weirder is that the dart js command is throwing flutter errors across the entire project, as if its ignoring my specific files. I'm not sure why.

In any case, do you think purging all references from dart:ui will have the dart js command compile correctly? It's the last barrier I'm faicng.

@d-markey
Copy link
Owner

d-markey commented Feb 5, 2022

Web Workers in general (not just Dart or Flutter) can't work on UI, because they don't have access to the DOM (see https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Using_web_workers and https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Functions_and_classes_available_to_workers). So I guess you will have to move everything UI-related out of your worker service.

In your example code, the service methods return Futures, and that means the full results will only be available for updating the UI when everything has been computed.

If you want to have progressive updates to your UI, you will need to change your service implementation and stream data from your workers back to your main program so it has a chance to update UI during processing. Squadron workers and worker pools supports streaming, you should just replace Future with Stream in the service definitions (using async* and yield to stream the results as appropriate).

@SaadArdati
Copy link
Author

Hm, that won't do since our computation is not meant to stream any data. I'll see if moving everything out resolves the issue.

Closing this issue for now. I'll open a new one if any new issues occur. Thank you for your time!

@d-markey
Copy link
Owner

d-markey commented Feb 5, 2022

Ok, I hope you'll achieve what you want with your app :)

Have you tried narrowing imports? eg. import 'dart:ui' show Size, Offset, Rect; ?

@SaadArdati
Copy link
Author

That's an excellent idea! Because creating our own implementation of those classes is a collosal task in our collosal project. I'll give it a shot right now.

@SaadArdati
Copy link
Author

Sad to say it failed :(

@d-markey
Copy link
Owner

d-markey commented Feb 5, 2022

Bad luck :-( but was worth a shot.

@d-markey d-markey transferred this issue from d-markey/squadron_sample Feb 5, 2022
@d-markey
Copy link
Owner

d-markey commented Feb 5, 2022

Transferred the issue to Squadron as it really belons there

d-markey added a commit that referenced this issue Feb 6, 2022
- Added a logging mechanism to facilitate debugging Squadron's internals
- Reworked connection flow between main program and worker
- Reworked task scheduling
- Fixed issue #3
- Added tests for logging and workers failing to initialize
@d-markey
Copy link
Owner

d-markey commented Feb 6, 2022

Version 3.2 has been published to pub.dev!
You should be able to upgrade and remove the workaround try/catch from your solver_worker.dart file.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants