Skip to content

Commit

Permalink
fix(WebWorker): Add zone support to MessageBus
Browse files Browse the repository at this point in the history
Closes #4053
  • Loading branch information
jteplitz committed Sep 8, 2015
1 parent 3b9c086 commit f3da37c
Show file tree
Hide file tree
Showing 35 changed files with 611 additions and 348 deletions.
7 changes: 4 additions & 3 deletions modules/angular2/src/core/zone/ng_zone.dart
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,15 @@ class NgZone {
_inVmTurnDone = true;
parent.run(_innerZone, _onTurnDone);

if (_pendingMicrotasks == 0 && _onEventDone != null) {
runOutsideAngular(_onEventDone);
}
} finally {
_inVmTurnDone = false;
_hasExecutedCodeInInnerZone = false;
}
}

if (_pendingMicrotasks == 0 && _onEventDone != null) {
runOutsideAngular(_onEventDone);
}
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions modules/angular2/src/core/zone/ng_zone.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export class NgZone {
*
* This hook is useful for validating application state (e.g. in a test).
*/
overrideOnEventDone(onEventDoneFn: Function, opt_waitForAsync: boolean): void {
overrideOnEventDone(onEventDoneFn: Function, opt_waitForAsync: boolean = false): void {
var normalizedOnEventDone = normalizeBlank(onEventDoneFn);
if (opt_waitForAsync) {
this._onEventDone = () => {
Expand Down Expand Up @@ -212,14 +212,15 @@ export class NgZone {
try {
this._inVmTurnDone = true;
parentRun.call(ngZone._innerZone, ngZone._onTurnDone);
if (ngZone._pendingMicrotasks === 0 && isPresent(ngZone._onEventDone)) {
ngZone.runOutsideAngular(ngZone._onEventDone);
}
} finally {
this._inVmTurnDone = false;
ngZone._hasExecutedCodeInInnerZone = false;
}
}

if (ngZone._pendingMicrotasks === 0 && isPresent(ngZone._onEventDone)) {
ngZone.runOutsideAngular(ngZone._onEventDone);
}
}
}
};
Expand Down
8 changes: 8 additions & 0 deletions modules/angular2/src/mock/ng_zone_mock.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import {NgZone} from 'angular2/src/core/zone/ng_zone';

export class MockNgZone extends NgZone {
_onEventDone: () => void;

constructor() { super({enableLongStackTrace: false}); }

run(fn: Function): any { return fn(); }

runOutsideAngular(fn: Function): any { return fn(); }

overrideOnEventDone(fn: () => void, opt_waitForAsync: boolean = false): void {
this._onEventDone = fn;
}

simulateZoneExit(): void { this._onEventDone(); }
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
library angular2.src.web_workers.debug_tools.multi_client_server_message_bus;

import "package:angular2/src/web_workers/shared/message_bus.dart"
show MessageBus, MessageBusSink, MessageBusSource;
import 'dart:io';
import 'dart:convert' show JSON;
import 'dart:async';
import 'package:angular2/src/core/facade/async.dart' show EventEmitter;
import 'package:angular2/src/web_workers/shared/messaging_api.dart';
import 'package:angular2/src/web_workers/shared/generic_message_bus.dart';

// TODO(jteplitz602): Remove hard coded result type and
// clear messageHistory once app is done with it #3859
class MultiClientServerMessageBus implements MessageBus {
final MultiClientServerMessageBusSink sink;
MultiClientServerMessageBusSource source;
class MultiClientServerMessageBus extends GenericMessageBus {
bool hasPrimary = false;

MultiClientServerMessageBus(this.sink, this.source);
@override
MultiClientServerMessageBusSink get sink => super.sink;
@override
MultiClientServerMessageBusSource get source => super.source;

MultiClientServerMessageBus(MultiClientServerMessageBusSink sink,
MultiClientServerMessageBusSource source)
: super(sink, source);

MultiClientServerMessageBus.fromHttpServer(HttpServer server)
: sink = new MultiClientServerMessageBusSink() {
source = new MultiClientServerMessageBusSource(resultReceived);
: super(new MultiClientServerMessageBusSink(),
new MultiClientServerMessageBusSource()) {
source.onResult.listen(_resultReceived);
server.listen((HttpRequest request) {
if (request.uri.path == "/ws") {
WebSocketTransformer.upgrade(request).then((WebSocket socket) {
Expand All @@ -38,18 +42,10 @@ class MultiClientServerMessageBus implements MessageBus {
});
}

void resultReceived() {
void _resultReceived(_) {
sink.resultReceived();
}

EventEmitter from(String channel) {
return source.from(channel);
}

EventEmitter to(String channel) {
return sink.to(channel);
}

Function _handleDisconnect(WebSocketWrapper wrapper) {
return () {
sink.removeConnection(wrapper);
Expand All @@ -72,12 +68,15 @@ class WebSocketWrapper {
WebSocketWrapper(this._messageHistory, this._resultMarkers, this.socket) {
stream = socket.asBroadcastStream();
stream.listen((encodedMessage) {
var message = JSON.decode(encodedMessage)['message'];
if (message is Map && message.containsKey("type")) {
if (message['type'] == 'result') {
resultReceived();
var messages = JSON.decode(encodedMessage);
messages.forEach((data) {
var message = data['message'];
if (message is Map && message.containsKey("type")) {
if (message['type'] == 'result') {
resultReceived();
}
}
}
});
});
}

Expand Down Expand Up @@ -121,10 +120,9 @@ class WebSocketWrapper {
}
}

class MultiClientServerMessageBusSink implements MessageBusSink {
class MultiClientServerMessageBusSink extends GenericMessageBusSink {
final List<String> messageHistory = new List<String>();
final Set<WebSocketWrapper> openConnections = new Set<WebSocketWrapper>();
final Map<String, EventEmitter> _channels = new Map<String, EventEmitter>();
final List<int> resultMarkers = new List<int>();

void resultReceived() {
Expand All @@ -141,76 +139,77 @@ class MultiClientServerMessageBusSink implements MessageBusSink {
openConnections.remove(webSocket);
}

EventEmitter to(String channel) {
if (_channels.containsKey(channel)) {
return _channels[channel];
} else {
var emitter = new EventEmitter();
emitter.listen((message) {
_send({'channel': channel, 'message': message});
});
return emitter;
}
}

void _send(dynamic message) {
String encodedMessage = JSON.encode(message);
@override
void sendMessages(List<dynamic> messages) {
String encodedMessages = JSON.encode(messages);
openConnections.forEach((WebSocketWrapper webSocket) {
if (webSocket.caughtUp) {
webSocket.socket.add(encodedMessage);
webSocket.socket.add(encodedMessages);
}
});
messageHistory.add(encodedMessage);
messageHistory.add(encodedMessages);
}
}

class MultiClientServerMessageBusSource implements MessageBusSource {
final Map<String, EventEmitter> _channels = new Map<String, EventEmitter>();
class MultiClientServerMessageBusSource extends GenericMessageBusSource {
Function onResultReceived;
final StreamController mainController;
final StreamController resultController = new StreamController();

MultiClientServerMessageBusSource(this.onResultReceived);
MultiClientServerMessageBusSource._(controller)
: mainController = controller,
super(controller.stream);

EventEmitter from(String channel) {
if (_channels.containsKey(channel)) {
return _channels[channel];
} else {
var emitter = new EventEmitter();
_channels[channel] = emitter;
return emitter;
}
factory MultiClientServerMessageBusSource() {
return new MultiClientServerMessageBusSource._(
new StreamController.broadcast());
}

Stream get onResult => resultController.stream;

void addConnection(WebSocketWrapper webSocket) {
if (webSocket.isPrimary) {
webSocket.stream.listen((encodedMessage) {
var decodedMessage = decodeMessage(encodedMessage);
var channel = decodedMessage['channel'];
var message = decodedMessage['message'];
if (message is Map && message.containsKey("type")) {
if (message['type'] == 'result') {
// tell the bus that a result was received on the primary
onResultReceived();
webSocket.stream.listen((encodedMessages) {
var decodedMessages = _decodeMessages(encodedMessages);
decodedMessages.forEach((decodedMessage) {
var message = decodedMessage['message'];
if (message is Map && message.containsKey("type")) {
if (message['type'] == 'result') {
// tell the bus that a result was received on the primary
resultController.add(message);
}
}
}
});

if (_channels.containsKey(channel)) {
_channels[channel].add(message);
}
mainController.add(decodedMessages);
});
} else {
webSocket.stream.listen((encodedMessage) {
// handle events from non-primary browser
var decodedMessage = decodeMessage(encodedMessage);
var channel = decodedMessage['channel'];
var message = decodedMessage['message'];
if (_channels.containsKey(EVENT_CHANNEL) && channel == EVENT_CHANNEL) {
_channels[channel].add(message);
webSocket.stream.listen((encodedMessages) {
// handle events from non-primary connection.
var decodedMessages = _decodeMessages(encodedMessages);
var eventMessages = new List<Map<String, dynamic>>();
decodedMessages.forEach((decodedMessage) {
var channel = decodedMessage['channel'];
if (channel == EVENT_CHANNEL) {
eventMessages.add(decodedMessage);
}
});
if (eventMessages.length > 0) {
mainController.add(eventMessages);
}
});
}
}

Map<String, dynamic> decodeMessage(dynamic message) {
return JSON.decode(message);
List<dynamic> _decodeMessages(dynamic messages) {
return JSON.decode(messages);
}

// This is a noop for the MultiClientBus because it has to decode the JSON messages before
// the generic bus receives them in order to check for results and forward events
// from the non-primary connection.
@override
List<dynamic> decodeMessages(dynamic messages) {
return messages;
}
}
Loading

0 comments on commit f3da37c

Please sign in to comment.