Skip to content
This repository has been archived by the owner on Sep 27, 2021. It is now read-only.

Commit

Permalink
refactoring http_server example to use async/await
Browse files Browse the repository at this point in the history
R=lrn@google.com

Review URL: https://codereview.chromium.org//1033843002
  • Loading branch information
kevmoo committed Mar 26, 2015
1 parent 4b177a9 commit 2bf5a19
Showing 1 changed file with 70 additions and 57 deletions.
127 changes: 70 additions & 57 deletions example/http_server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,24 @@ import "package:isolate/runner.dart";
typedef Future RemoteStop();

Future<RemoteStop> runHttpServer(
Runner runner, ServerSocket socket, HttpListener listener) {
return runner.run(_startHttpServer, new List(2)..[0] = socket.reference
..[1] = listener)
.then((SendPort stopPort) => () => _sendStop(stopPort));
}
Runner runner, int port, HttpListener listener) async {
var stopPort = await runner.run(_startHttpServer, [port, listener]);

Future _sendStop(SendPort stopPort) {
return singleResponseFuture(stopPort.send);
return () => _sendStop(stopPort);
}

Future<SendPort> _startHttpServer(List args) {
ServerSocketReference ref = args[0];
Future _sendStop(SendPort stopPort) => singleResponseFuture(stopPort.send);

Future<SendPort> _startHttpServer(List args) async {
int port = args[0];
HttpListener listener = args[1];
return ref.create().then((socket) {
return listener.start(new HttpServer.listenOn(socket));
}).then((_) {
return singleCallbackPort((SendPort resultPort) {
sendFutureResult(new Future.sync(listener.stop), resultPort);
});

var server =
await HttpServer.bind(InternetAddress.ANY_IP_V6, port, shared: true);
await listener.start(server);

return singleCallbackPort((SendPort resultPort) {
sendFutureResult(new Future.sync(listener.stop), resultPort);
});
}

Expand All @@ -52,65 +51,79 @@ abstract class HttpListener {
/// Returns the message content plus an ID describing the isolate that
/// handled the request.
class EchoHttpListener implements HttpListener {
static const _delay = const Duration(seconds: 2);
static final _id = Isolate.current.hashCode;
final SendPort _counter;

StreamSubscription _subscription;
static int _id = new Object().hashCode;
SendPort _counter;

EchoHttpListener(this._counter);

start(HttpServer server) {
Future start(HttpServer server) async {
print("Starting isolate $_id");
_subscription = server.listen((HttpRequest request) {
request.response.addStream(request).then((_) {
_counter.send(null);
print("Request to $_id");
request.response.write("#$_id\n");
var t0 = new DateTime.now().add(new Duration(seconds: 2));
while (new DateTime.now().isBefore(t0));
print("Response from $_id");
request.response.close();
});
_subscription = server.listen((HttpRequest request) async {
await request.response.addStream(request);
print("Request to $hashCode");
request.response.write("#$_id\n");
var watch = new Stopwatch()..start();
while (watch.elapsed < _delay);
print("Response from $_id");
await request.response.close();
_counter.send(null);
});
}

stop() {
Future stop() async {
print("Stopping isolate $_id");
_subscription.cancel();
await _subscription.cancel();
_subscription = null;
}
}

main(args) {
main(List<String> args) async {
int port = 0;
if (args.length > 0) {
port = int.parse(args[0]);
}
RawReceivePort counter = new RawReceivePort();

var counter = new ReceivePort();
HttpListener listener = new EchoHttpListener(counter.sendPort);
ServerSocket
.bind(InternetAddress.ANY_IP_V6, port)
.then((ServerSocket socket) {
port = socket.port;
return Future.wait(new Iterable.generate(5, (_) => IsolateRunner.spawn()),
cleanUp: (isolate) { isolate.close(); })
.then((List<IsolateRunner> isolates) {
return Future.wait(isolates.map((IsolateRunner isolate) {
return runHttpServer(isolate, socket, listener);
}), cleanUp: (server) { server.stop(); });
})
.then((stoppers) {
socket.close();
int count = 25;
counter.handler = (_) {
count--;
if (count == 0) {
stoppers.forEach((f) => f());
counter.close();
}
};
print("Server listening on port $port for 25 requests");
print("Test with:");
print(" ab -c10 -n 25 http://localhost:$port/");
});

// Used to ensure the requested port is available or to find an available
// port if `0` is provided.
ServerSocket socket =
await ServerSocket.bind(InternetAddress.ANY_IP_V6, port, shared: true);

port = socket.port;
var isolates = await Future.wait(
new Iterable.generate(5, (_) => IsolateRunner.spawn()),
cleanUp: (isolate) {
isolate.close();
});

List<RemoteStop> stoppers = await Future.wait(isolates
.map((IsolateRunner isolate) {
return runHttpServer(isolate, socket.port, listener);
}), cleanUp: (server) {
server.stop();
});

await socket.close();
int count = 25;

print("Server listening on port $port for $count requests");
print("Test with:");
print(" ab -l -c10 -n $count http://localhost:$port/");

await for (var event in counter) {
count--;
if (count == 0) {
print('Shutting down');
for (var stopper in stoppers) {
await stopper();
}
counter.close();
}
}
print('Finished');
}

0 comments on commit 2bf5a19

Please sign in to comment.