Skip to content

Commit

Permalink
Support grpc-web in pure dart (#287)
Browse files Browse the repository at this point in the history
By using package:http for making connections.
  • Loading branch information
robsonmeemo authored May 12, 2020
1 parent 39c7511 commit c513e14
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 165 deletions.
120 changes: 50 additions & 70 deletions lib/src/client/transport/xhr_transport.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
// limitations under the License.

import 'dart:async';
import 'dart:html';
import 'dart:typed_data';

import 'package:http/http.dart';
import 'package:meta/meta.dart';

import '../../client/call.dart';
Expand All @@ -27,10 +27,10 @@ import 'transport.dart';
import 'web_streams.dart';

class XhrTransportStream implements GrpcTransportStream {
final HttpRequest _request;
final Client _client;
final Request _request;
final ErrorHandler _onError;
final Function(XhrTransportStream stream) _onDone;
int _requestBytesRead = 0;
final StreamController<ByteBuffer> _incomingProcessor = StreamController();
final StreamController<GrpcMessage> _incomingMessages = StreamController();
final StreamController<List<int>> _outgoingMessages = StreamController();
Expand All @@ -41,53 +41,34 @@ class XhrTransportStream implements GrpcTransportStream {
@override
StreamSink<List<int>> get outgoingMessages => _outgoingMessages.sink;

XhrTransportStream(this._request, {onError, onDone})
XhrTransportStream(this._client, this._request, {onError, onDone})
: _onError = onError,
_onDone = onDone {
_outgoingMessages.stream
.map(frame)
.listen((data) => _request.send(data), cancelOnError: true);

_request.onReadyStateChange.listen((data) {
if (_incomingMessages.isClosed) {
return;
}
switch (_request.readyState) {
case HttpRequest.HEADERS_RECEIVED:
_onHeadersReceived();
break;
case HttpRequest.DONE:
if (_request.status != 200) {
_onError(GrpcError.unavailable(
'XhrConnection status ${_request.status}'));
} else {
_close();
}
break;
}
});

_request.onError.listen((ProgressEvent event) {
final asyncOnError = (e, st) {
if (_incomingMessages.isClosed) {
return;
}
_onError(GrpcError.unavailable('XhrConnection connection-error'));
terminate();
});

_request.onProgress.listen((_) {
if (_incomingMessages.isClosed) {
return;
}
// Use response over responseText as most browsers don't support
// using responseText during an onProgress event.
final responseString = _request.response as String;
final bytes = Uint8List.fromList(
responseString.substring(_requestBytesRead).codeUnits)
.buffer;
_requestBytesRead = responseString.length;
_incomingProcessor.add(bytes);
});
};
_outgoingMessages.stream.map(frame).listen((data) {
_request.bodyBytes = data;
var firstMessage = true;
_client.send(_request).then((response) {
if (_incomingMessages.isClosed) {
return;
}
if (firstMessage) {
if (!_onHeadersReceived(response)) {
return;
}
}
firstMessage = false;
response.stream.listen((data) {
_incomingProcessor.add(Uint8List.fromList(data).buffer);
}, onDone: _close);
}).catchError(asyncOnError);
}, cancelOnError: true, onError: asyncOnError);

_incomingProcessor.stream
.transform(GrpcWebDecoder())
Expand All @@ -96,30 +77,27 @@ class XhrTransportStream implements GrpcTransportStream {
onError: _onError, onDone: _incomingMessages.close);
}

_onHeadersReceived() {
final contentType = _request.getResponseHeader('Content-Type');
if (_request.status != 200) {
bool _onHeadersReceived(StreamedResponse response) {
final contentType = response.headers['content-type'];
if (response.statusCode != 200) {
_onError(
GrpcError.unavailable('XhrConnection status ${_request.status}'));
return;
GrpcError.unavailable('XhrConnection status ${response.statusCode}'));
return false;
}
if (contentType == null) {
_onError(GrpcError.unavailable('XhrConnection missing Content-Type'));
return;
return false;
}
if (!contentType.startsWith('application/grpc')) {
_onError(
GrpcError.unavailable('XhrConnection bad Content-Type $contentType'));
return;
}
if (_request.response == null) {
_onError(GrpcError.unavailable('XhrConnection request null response'));
return;
return false;
}

// Force a metadata message with headers.
final headers = GrpcMetadata(_request.responseHeaders);
final headers = GrpcMetadata(response.headers);
_incomingMessages.add(headers);
return true;
}

_close() {
Expand All @@ -131,45 +109,47 @@ class XhrTransportStream implements GrpcTransportStream {
@override
Future<void> terminate() async {
_close();
_request.abort();
}
}

class XhrClientConnection extends ClientConnection {
final Uri uri;
Client _client;

final Set<XhrTransportStream> _requests = Set<XhrTransportStream>();

XhrClientConnection(this.uri);
XhrClientConnection(this.uri) {
_client = createClient();
}

String get authority => uri.authority;
String get scheme => uri.scheme;

void _initializeRequest(HttpRequest request, Map<String, String> metadata) {
void _initializeRequest(Request request, Map<String, String> metadata) {
for (final header in metadata.keys) {
request.setRequestHeader(header, metadata[header]);
request.headers[header] = metadata[header];
}
request.setRequestHeader('Content-Type', 'application/grpc-web+proto');
request.setRequestHeader('X-User-Agent', 'grpc-web-dart/0.1');
request.setRequestHeader('X-Grpc-Web', '1');
// Overriding the mimetype allows us to stream and parse the data
request.overrideMimeType('text/plain; charset=x-user-defined');
request.responseType = 'text';
request.headers['Content-Type'] = 'application/grpc-web+proto';
request.headers['X-User-Agent'] = 'grpc-web-dart/0.1';
request.headers['X-Grpc-Web'] = '1';
}

@visibleForTesting
HttpRequest createHttpRequest() => HttpRequest();
Request createHttpRequest(String path) => Request('POST', uri.resolve(path));

@visibleForTesting
Client createClient() => Client();

@override
GrpcTransportStream makeRequest(String path, Duration timeout,
Map<String, String> metadata, ErrorHandler onError) {
final HttpRequest request = createHttpRequest();
request.open('POST', uri.resolve(path).toString());
final Request request = createHttpRequest(path);

_initializeRequest(request, metadata);

final XhrTransportStream transportStream =
XhrTransportStream(request, onError: onError, onDone: _removeStream);
final XhrTransportStream transportStream = XhrTransportStream(
_client, request,
onError: onError, onDone: _removeStream);
_requests.add(transportStream);
return transportStream;
}
Expand Down
Loading

0 comments on commit c513e14

Please sign in to comment.