Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
# Name/Organization <email address>

Google Inc.
German Saprykin <saprykin.h@gmail.com>
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.5.0 - 2018-06-29

* Implemented interceptors.

## 0.4.1 - 2018-04-04

* Fixes for supporting Dart 2.
Expand Down
1 change: 1 addition & 0 deletions lib/grpc.dart
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export 'src/client/options.dart';

export 'src/server/call.dart';
export 'src/server/handler.dart';
export 'src/server/interceptor.dart';
export 'src/server/server.dart';
export 'src/server/service.dart';

Expand Down
33 changes: 32 additions & 1 deletion lib/src/server/handler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import '../shared/streams.dart';
import '../shared/timeout.dart';

import 'call.dart';
import 'interceptor.dart';
import 'service.dart';

/// Handles an incoming gRPC call.
class ServerHandler extends ServiceCall {
final ServerTransportStream _stream;
final Service Function(String service) _serviceLookup;
final List<Interceptor> _interceptors;

StreamSubscription<GrpcMessage> _incomingSubscription;

Expand All @@ -51,14 +53,19 @@ class ServerHandler extends ServiceCall {
bool _isTimedOut = false;
Timer _timeoutTimer;

ServerHandler(this._serviceLookup, this._stream);
ServerHandler(this._serviceLookup, this._stream,
[this._interceptors = const <Interceptor>[]]);

DateTime get deadline => _deadline;

bool get isCanceled => _isCanceled;

bool get isTimedOut => _isTimedOut;

Map<String, String> get clientMetadata => _clientMetadata;

Map<String, String> get headers => _customHeaders;

Map<String, String> get trailers => _customTrailers;

void handle() {
Expand Down Expand Up @@ -103,16 +110,40 @@ class ServerHandler extends ServiceCall {
}
final serviceName = pathSegments[1];
final methodName = pathSegments[2];

_service = _serviceLookup(serviceName);
_descriptor = _service?.$lookupMethod(methodName);
if (_descriptor == null) {
_sendError(new GrpcError.unimplemented('Path $path not found'));
_sinkIncoming();
return;
}

final error = _applyInterceptors();
if (error != null) {
_sendError(error);
_sinkIncoming();
return;
}

_startStreamingRequest();
}

GrpcError _applyInterceptors() {
try {
for (final interceptor in _interceptors) {
final error = interceptor(this, this._descriptor);
if (error != null) {
return error;
}
}
} catch (error) {
final grpcError = new GrpcError.internal(error.toString());
return grpcError;
}
return null;
}

void _startStreamingRequest() {
_incomingSubscription.pause();
_requests = _descriptor.createRequestStream(_incomingSubscription);
Expand Down
13 changes: 13 additions & 0 deletions lib/src/server/interceptor.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import '../shared/status.dart';

import 'call.dart';
import 'service.dart';

/// A gRPC Interceptor.
///
/// An interceptor is called before the corresponding [ServiceMethod] invocation.
/// If the interceptor returns a [GrpcError], the error will be returned as a response and [ServiceMethod] wouldn't be called.
/// If the interceptor throws [Exception], [GrpcError.internal] with exception.toString() will be returned.
/// If the interceptor returns null, the corresponding [ServiceMethod] of [Service] will be called.
typedef Interceptor = GrpcError Function(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sigurdm I have checked go implementation for interceptors

https://github.com/grpc/grpc-go/blob/a3e99ebee0a498c3093e9741d09c579f0fc34a7f/server.go#L1020

From my understanding it has only two cases - no-error and error,
if there is an error, write a status and stop processing a request.
If there is no error, continue processing.

Imo it doesn't make sense to add this feature (I mean silent blocking) now.
Also I am not sure it conforms to grpc standard or not.

What do you think? Is it required?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No let's stick with error/no-error!

ServiceCall call, ServiceMethod method);
8 changes: 6 additions & 2 deletions lib/src/server/server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import 'package:http2/transport.dart';
import '../shared/security.dart';

import 'handler.dart';
import 'interceptor.dart';
import 'service.dart';

class ServerTlsCredentials {
Expand Down Expand Up @@ -57,13 +58,16 @@ class ServerTlsCredentials {
/// Listens for incoming RPCs, dispatching them to the right [Service] handler.
class Server {
final Map<String, Service> _services = {};
final List<Interceptor> _interceptors;

ServerSocket _insecureServer;
SecureServerSocket _secureServer;
final _connections = <ServerTransportConnection>[];

/// Create a server for the given [services].
Server(List<Service> services) {
Server(List<Service> services,
[List<Interceptor> interceptors = const <Interceptor>[]])
: _interceptors = interceptors {
for (final service in services) {
_services[service.$name] = service;
}
Expand Down Expand Up @@ -110,7 +114,7 @@ class Server {
}

void serveStream(ServerTransportStream stream) {
new ServerHandler(lookupService, stream).handle();
new ServerHandler(lookupService, stream, _interceptors).handle();
}

Future<Null> shutdown() async {
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: grpc
description: Dart implementation of gRPC.
version: 0.4.1
version: 0.5.0
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/grpc-dart

Expand Down
56 changes: 56 additions & 0 deletions test/server_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -277,4 +277,60 @@ void main() {
..toServer.close();
await harness.fromServer.done;
});

group('Server with interceptor', () {
test('processes calls if interceptor allows request', () async {
const expectedRequest = 5;
const expectedResponse = 7;
Future<int> methodHandler(ServiceCall call, Future<int> request) async {
expect(await request, expectedRequest);
return expectedResponse;
}

GrpcError interceptorHandler(ServiceCall call, ServiceMethod method) {
if (method.name == "Unary") {
return null;
}
return new GrpcError.unauthenticated('Request is unauthenticated');
}

harness
..interceptor.handler = interceptorHandler
..service.unaryHandler = methodHandler
..runTest('/Test/Unary', [expectedRequest], [expectedResponse]);

await harness.fromServer.done;
});

test('returns error if interceptor blocks request', () async {
GrpcError interceptorHandler(ServiceCall call, ServiceMethod method) {
if (method.name == "Unary") {
return new GrpcError.unauthenticated('Request is unauthenticated');
}
return null;
}

harness
..interceptor.handler = interceptorHandler
..expectErrorResponse(
StatusCode.unauthenticated, 'Request is unauthenticated')
..sendRequestHeader('/Test/Unary');

await harness.fromServer.done;
});

test('returns internal error if interceptor throws exception', () async {
GrpcError interceptorHandler(ServiceCall call, ServiceMethod method) {
throw new Exception('Reason is unknown');
}

harness
..interceptor.handler = interceptorHandler
..expectErrorResponse(
StatusCode.internal, 'Exception: Reason is unknown')
..sendRequestHeader('/Test/Unary');

await harness.fromServer.done;
});
});
}
16 changes: 15 additions & 1 deletion test/src/server_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ class TestService extends Service {
}
}

class TestInterceptor {
Interceptor handler;

GrpcError call(ServiceCall call, ServiceMethod method) {
if (handler == null) {
return null;
}

return handler(call, method);
}
}

class TestServerStream extends ServerTransportStream {
final Stream<StreamMessage> incomingMessages;
final StreamSink<StreamMessage> outgoingMessages;
Expand Down Expand Up @@ -107,10 +119,12 @@ class ServerHarness {
final toServer = new StreamController<StreamMessage>();
final fromServer = new StreamController<StreamMessage>();
final service = new TestService();
final interceptor = new TestInterceptor();

Server server;

ServerHarness() {
server = new Server([service]);
server = new Server(<Service>[service], <Interceptor>[interceptor]);
}

static ServiceMethod<int, int> createMethod(String name,
Expand Down