/
byte_stream_channel.dart
137 lines (119 loc) · 3.79 KB
/
byte_stream_channel.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
library channel.byte_stream;
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'channel.dart';
import 'protocol.dart';
/**
* Instances of the class [ByteStreamClientChannel] implement a
* [ClientCommunicationChannel] that uses a stream and a sink (typically,
* standard input and standard output) to communicate with servers.
*/
class ByteStreamClientChannel implements ClientCommunicationChannel {
final Stream input;
final IOSink output;
@override
Stream<Response> responseStream;
@override
Stream<Notification> notificationStream;
ByteStreamClientChannel(this.input, this.output) {
Stream jsonStream = input.transform((new Utf8Codec()).decoder)
.transform(new LineSplitter())
.transform(new JsonStreamDecoder())
.where((json) => json is Map)
.asBroadcastStream();
responseStream = jsonStream
.where((json) => json[Notification.EVENT] == null)
.transform(new ResponseConverter())
.asBroadcastStream();
notificationStream = jsonStream
.where((json) => json[Notification.EVENT] != null)
.transform(new NotificationConverter())
.asBroadcastStream();
}
@override
Future close() {
return output.close();
}
@override
Future<Response> sendRequest(Request request) {
String id = request.id;
output.writeln(JSON.encode(request.toJson()));
return responseStream.firstWhere((Response response) => response.id == id);
}
}
/**
* Instances of the class [ByteStreamServerChannel] implement a
* [ServerCommunicationChannel] that uses a stream and a sink (typically,
* standard input and standard output) to communicate with clients.
*/
class ByteStreamServerChannel implements ServerCommunicationChannel {
final Stream input;
final IOSink output;
/**
* Completer that will be signalled when the input stream is closed.
*/
final Completer _closed = new Completer();
ByteStreamServerChannel(this.input, this.output);
/**
* Future that will be completed when the input stream is closed.
*/
Future get closed {
return _closed.future;
}
@override
void close() {
if (!_closed.isCompleted) {
_closed.complete();
}
}
@override
void listen(void onRequest(Request request), {Function onError, void
onDone()}) {
input.transform((new Utf8Codec()).decoder).transform(new LineSplitter()
).listen((String data) => _readRequest(data, onRequest), onError: onError,
onDone: () {
close();
onDone();
});
}
@override
void sendNotification(Notification notification) {
// Don't send any further notifications after the communication channel is
// closed.
if (_closed.isCompleted) {
return;
}
output.writeln(JSON.encode(notification.toJson()));
}
@override
void sendResponse(Response response) {
// Don't send any further responses after the communication channel is
// closed.
if (_closed.isCompleted) {
return;
}
output.writeln(JSON.encode(response.toJson()));
}
/**
* Read a request from the given [data] and use the given function to handle
* the request.
*/
void _readRequest(Object data, void onRequest(Request request)) {
// Ignore any further requests after the communication channel is closed.
if (_closed.isCompleted) {
return;
}
// Parse the string as a JSON descriptor and process the resulting
// structure as a request.
Request request = new Request.fromString(data);
if (request == null) {
sendResponse(new Response.invalidRequestFormat());
return;
}
onRequest(request);
}
}