Skip to content

angel-dart/task

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

18 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

task

Pub build status

Support for running and scheduling asynchronous tasks within Angel. Parameters can be injected into scheduled tasks with the same dependency injection system used by Angel.

Installation

In your pubspec.yaml file:

  dependencies:
    angel_framework: ^1.0.0
    angel_task: ^1.0.0

Usage

main() async {
  var app = await createServer();
  var scheduler = new AngelTaskScheduler(app);

  // Run a one-off task, with an optional delay.
  scheduler.once((Todo singleton) {
    print('3 seconds later, we found our Todo singleton: "${singleton.text}"');
  }, new Duration(seconds: 3));

  Task foo;
  int i = 0;

  // Periodically run functionality
  foo = scheduler.seconds(1, () {
    print('Printing ${++i} time(s)!');

    if (i >= 3) {
      print('Cancelling foo task...');
      foo.cancel();
    }
  });
  
  // Named tasks!
  var greetTask = scheduler.minutes(3, (String message) => print(message), name: 'greet');
  
  // You can still access services, etc., and thus manipulate databases from tasks.
  scheduler.once(() {
    app.service('foo').create({'foo': 'bar'});
  });

  // If you never start the scheduler, no tasks will ever run.
  await scheduler.start();
  
  // Run a named task
  var result = await scheduler.run('greet', ['Hello, world!']);
}

Make sure to start the scheduler. Otherwise, your tasks will never run:

main() async {
  // ...
  await scheduler.start();
}

You can also listen to a Stream of a task's results:

main() async {
  // ...
  var task = scheduler.minutes(3, fibonacci, args: [13]);
  var fib13 = await task.results.first;
}

Multithreading

Angel's task engine also supports communication over isolates, which allows you to run all tasks in a separate thread from the server, without losing performance.

For example, if your processor has 4 cores, you might spawn 4 total isolates:

  • 3 child isolates, all running the server
  • 1 master isolate, which runs the task engine

The master isolate can be dedicated to just running tasks, with no HTTP responsibility. The child isolates only have to worry about serving your application to the Web.

The AngelTaskScheduler has a receivePort that it uses to communicate with clients.

To access the scheduler as a client, instantiate a AngelTaskClient. The constructor accepts a single SendPort, which in this case comes from the scheduler.

IMPORTANT: If you want to send the return value of task functions to clients, then set sendReturnValues to true in the AngelTaskScheduler constructor. If so, then the results of your task callbacks will have to be primitive Dart values, serializable over SendPorts.

Use this as a mechanism to query the state of the master isolate. You might find that you won't need it, though.

main() async {
  var nInstances = Platform.numberOfProcessors - 1;
  
  // This instance won't actually serve HTTP, we just use it for DI.
  var app = await createServer();
  var scheduler = new AngelTaskScheduler(app);
  
  // Start listening, running tasks, etc.
  await scheduler.start();
  
  for (int i = 0; i < nInstances; i++) {
    // Spawn child nodes now. Make sure to send the scheduler's SendPort.
    Isolate.spawn(isolateMain, [i, scheduler.receivePort.sendPort]);
  }
}

/// The code that runs in the child nodes, i.e., the ones running the application.
void isolateMain(List args) {
  int id = args[0];
  SendPort masterPort = args[1];
  var app = new Angel.custom(startShared);
  
  // Hook up a task client, then start the server.
  app.configure(taskClientPlugin(masterPort)).then((_) async {
    var server = await app.startServer(InternetAddress.ANY_IP_V4, 3000);
    print('Instance #$id listening at http://${server.address.address}:${server.port}');
  });
}

/// A simple plug-in that connects a server instance to the master task scheduler.
AngelConfigurer taskClientPlugin(SendPort masterPort) {
  return (Angel app) async {
    var client = new AngelTaskClient(masterPort);
    await master.connect(); // Await a connection...
    await master.connect(timeout: new Duration(seconds: 30)); // Optional timeout.
    
    // If we inject the AngelTaskClient as a singleton, we can access it in routes.
    app.container.singleton(client);
    
    // We can dispatch tasks, without waiting for the result.
    app.get('/dispatch', (AngelTaskClient client) {
      client.run('foo', args: ['bar']);
    });
    
    // We can also await the results of tasks.
    app.get('/fibonacci/:number([0-9]+)', (AngelTaskClient client, String number) async {
      var n = int.parse(number);
      var taskResult = await client.run('fibonacci', args: [n]);
      
      if (!taskResult.successful) {
        // Access error and stack trace on failure
        print(taskResult.error);
        print(taskResult.stack);
        throw new AngelHttpException.notProcessable();
      }
      
      // If `sendReturnValues` is `true` in our `AngelTaskScheduler`, then we can
      // also access the value returned from the task function.
      var computation = task.value;
      return {'value': computation};
    });
  };
}

Sockets

Applications of very large scale will likely run on multiple machines; in such a case, mere multi-threading will not do the job.

Luckily, package:angel_task also supports communication over sockets. This can occur at the same time as communication over isolates, so you can use both together in your application.

To bind the scheduler to a socket, you must pass it to the constructor.

main() async {
  var socket = await ServerSocket.bind(InternetAddress.ANY_IP_V4, 5671);
  var app = await createServer();
  var scheduler = new AngelTaskScheduler(app, socket: socket);
  
  // Task configuration...
  
  // Calling `start` will also listen on the socket, if any.
  await scheduler.start(); 
}

And then, to connect a client, it's almost the same process:

main() async {
  var app = await createServer();
  var client = await AngelTaskClient.connectSocket(InternetAddress.LOOPBACK_IP_V4, 5671);
 
  // You can still inject for DI.
  //
  // Just make sure to explicitly pass the right type.
  app.container.singleton(client, as: AngelTaskClient);
  
}

Broadcasting

If you are multi-threading, there may be times when you want to send an impromptu message to all child nodes. Use AngelTaskScheduler.broadcast to achieve this.

Whatever you broadcast should be a primitive Dart value; otherwise, it will not be serializable over a SendPort or Socket.

main() {
  // ...
  scheduler.broadcast({
    'michael': 'jackson'
  });
}

About

Support for running asynchronous jobs within Angel. Ideal for large applications.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages