Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add exception catching for grpc calls in engine #966

Merged
merged 1 commit into from
Oct 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,19 +198,29 @@ public SeldonMessage route(SeldonMessage input, PredictiveUnitState state)

case GRPC:
if (state.type == PredictiveUnitType.UNKNOWN_TYPE) {
GenericBlockingStub stub =
GenericGrpc.newBlockingStub(grpcChannelHandler.get(endpoint))
.withDeadlineAfter(grpcReadTimeout, TimeUnit.MILLISECONDS)
.withMaxInboundMessageSize(grpcMaxMessageSize)
.withMaxOutboundMessageSize(grpcMaxMessageSize);
return stub.route(input);
try {
GenericBlockingStub stub =
GenericGrpc.newBlockingStub(grpcChannelHandler.get(endpoint))
.withDeadlineAfter(grpcReadTimeout, TimeUnit.MILLISECONDS)
.withMaxInboundMessageSize(grpcMaxMessageSize)
.withMaxOutboundMessageSize(grpcMaxMessageSize);
return stub.route(input);
} catch (Exception e) {
logger.error("grpc exception genericStub route", e);
throw e;
}
} else {
RouterBlockingStub stub =
RouterGrpc.newBlockingStub(grpcChannelHandler.get(endpoint))
.withDeadlineAfter(grpcReadTimeout, TimeUnit.MILLISECONDS)
.withMaxInboundMessageSize(grpcMaxMessageSize)
.withMaxOutboundMessageSize(grpcMaxMessageSize);
return stub.route(input);
try {
RouterBlockingStub stub =
RouterGrpc.newBlockingStub(grpcChannelHandler.get(endpoint))
.withDeadlineAfter(grpcReadTimeout, TimeUnit.MILLISECONDS)
.withMaxInboundMessageSize(grpcMaxMessageSize)
.withMaxOutboundMessageSize(grpcMaxMessageSize);
return stub.route(input);
} catch (Exception e) {
logger.error("grpc exception routerStub route", e);
throw e;
}
}
}
throw new APIException(
Expand All @@ -227,26 +237,41 @@ public SeldonMessage sendFeedback(Feedback feedback, PredictiveUnitState state)

case GRPC:
if (state.type == PredictiveUnitType.UNKNOWN_TYPE) {
GenericBlockingStub stub =
GenericGrpc.newBlockingStub(grpcChannelHandler.get(endpoint))
.withDeadlineAfter(grpcReadTimeout, TimeUnit.MILLISECONDS)
.withMaxInboundMessageSize(grpcMaxMessageSize)
.withMaxOutboundMessageSize(grpcMaxMessageSize);
return stub.sendFeedback(feedback);
try {
GenericBlockingStub stub =
GenericGrpc.newBlockingStub(grpcChannelHandler.get(endpoint))
.withDeadlineAfter(grpcReadTimeout, TimeUnit.MILLISECONDS)
.withMaxInboundMessageSize(grpcMaxMessageSize)
.withMaxOutboundMessageSize(grpcMaxMessageSize);
return stub.sendFeedback(feedback);
} catch (Exception e) {
logger.error("grpc exception genericStub sendFeedback", e);
throw e;
}
} else if (state.type == PredictiveUnitType.MODEL) {
ModelBlockingStub modelStub =
ModelGrpc.newBlockingStub(grpcChannelHandler.get(endpoint))
.withDeadlineAfter(grpcReadTimeout, TimeUnit.MILLISECONDS)
.withMaxInboundMessageSize(grpcMaxMessageSize)
.withMaxOutboundMessageSize(grpcMaxMessageSize);
return modelStub.sendFeedback(feedback);
try {
ModelBlockingStub modelStub =
ModelGrpc.newBlockingStub(grpcChannelHandler.get(endpoint))
.withDeadlineAfter(grpcReadTimeout, TimeUnit.MILLISECONDS)
.withMaxInboundMessageSize(grpcMaxMessageSize)
.withMaxOutboundMessageSize(grpcMaxMessageSize);
return modelStub.sendFeedback(feedback);
} catch (Exception e) {
logger.error("grpc exception modelStub sendFeedback", e);
throw e;
}
} else {
RouterBlockingStub routerStub =
RouterGrpc.newBlockingStub(grpcChannelHandler.get(endpoint))
.withDeadlineAfter(grpcReadTimeout, TimeUnit.MILLISECONDS)
.withMaxInboundMessageSize(grpcMaxMessageSize)
.withMaxOutboundMessageSize(grpcMaxMessageSize);
return routerStub.sendFeedback(feedback);
try {
RouterBlockingStub routerStub =
RouterGrpc.newBlockingStub(grpcChannelHandler.get(endpoint))
.withDeadlineAfter(grpcReadTimeout, TimeUnit.MILLISECONDS)
.withMaxInboundMessageSize(grpcMaxMessageSize)
.withMaxOutboundMessageSize(grpcMaxMessageSize);
return routerStub.sendFeedback(feedback);
} catch (Exception e) {
logger.error("grpc exception routerStub sendFeedback", e);
throw e;
}
}
}
throw new APIException(
Expand All @@ -269,23 +294,28 @@ public SeldonMessage transformInput(SeldonMessage input, PredictiveUnitState sta
case GRPC:
switch (state.type) {
case UNKNOWN_TYPE:
GenericBlockingStub genStub =
GenericGrpc.newBlockingStub(grpcChannelHandler.get(endpoint))
try {
GenericBlockingStub genStub =
GenericGrpc.newBlockingStub(grpcChannelHandler.get(endpoint))
.withDeadlineAfter(grpcReadTimeout, TimeUnit.MILLISECONDS)
.withMaxInboundMessageSize(grpcMaxMessageSize)
.withMaxOutboundMessageSize(grpcMaxMessageSize);
return genStub.transformInput(input);
return genStub.transformInput(input);
} catch (Exception e) {
logger.error("grpc exception on genericStub transformInput ", e);
throw e;
}
case MODEL:
try {
ModelBlockingStub modelStub =
ModelGrpc.newBlockingStub(grpcChannelHandler.get(endpoint))
.withDeadlineAfter(grpcReadTimeout, TimeUnit.MILLISECONDS)
.withMaxInboundMessageSize(grpcMaxMessageSize)
.withMaxOutboundMessageSize(grpcMaxMessageSize);
logger.info(modelStub.getCallOptions().toString());
logger.debug(modelStub.getCallOptions().toString());
return modelStub.predict(input);
} catch (Exception e) {
logger.error("grpc exception ", e);
logger.error("grpc exception on modelStub predict ", e);
throw e;
}
case TRANSFORMER:
Expand All @@ -297,7 +327,7 @@ public SeldonMessage transformInput(SeldonMessage input, PredictiveUnitState sta
.withMaxOutboundMessageSize(grpcMaxMessageSize);
return transformerStub.transformInput(input);
} catch (Exception e) {
logger.error("grpc exception ", e);
logger.error("grpc exception transformStub transformInput", e);
throw e;
}
default:
Expand All @@ -319,19 +349,29 @@ public SeldonMessage transformOutput(SeldonMessage output, PredictiveUnitState s

case GRPC:
if (state.type == PredictiveUnitType.UNKNOWN_TYPE) {
GenericBlockingStub stub =
GenericGrpc.newBlockingStub(grpcChannelHandler.get(endpoint))
.withDeadlineAfter(grpcReadTimeout, TimeUnit.MILLISECONDS)
.withMaxInboundMessageSize(grpcMaxMessageSize)
.withMaxOutboundMessageSize(grpcMaxMessageSize);
return stub.transformOutput(output);
try {
GenericBlockingStub stub =
GenericGrpc.newBlockingStub(grpcChannelHandler.get(endpoint))
.withDeadlineAfter(grpcReadTimeout, TimeUnit.MILLISECONDS)
.withMaxInboundMessageSize(grpcMaxMessageSize)
.withMaxOutboundMessageSize(grpcMaxMessageSize);
return stub.transformOutput(output);
} catch (Exception e) {
logger.error("grpc exception genericStub transformOutput", e);
throw e;
}
} else {
OutputTransformerBlockingStub stub =
OutputTransformerGrpc.newBlockingStub(grpcChannelHandler.get(endpoint))
.withDeadlineAfter(grpcReadTimeout, TimeUnit.MILLISECONDS)
.withMaxInboundMessageSize(grpcMaxMessageSize)
.withMaxOutboundMessageSize(grpcMaxMessageSize);
return stub.transformOutput(output);
try {
OutputTransformerBlockingStub stub =
OutputTransformerGrpc.newBlockingStub(grpcChannelHandler.get(endpoint))
.withDeadlineAfter(grpcReadTimeout, TimeUnit.MILLISECONDS)
.withMaxInboundMessageSize(grpcMaxMessageSize)
.withMaxOutboundMessageSize(grpcMaxMessageSize);
return stub.transformOutput(output);
} catch (Exception e) {
logger.error("grpc exception outputTransformerStub transformOutput", e);
throw e;
}
}
}
throw new APIException(
Expand All @@ -350,19 +390,29 @@ public SeldonMessage aggregate(List<SeldonMessage> outputs, PredictiveUnitState

case GRPC:
if (state.type == PredictiveUnitType.UNKNOWN_TYPE) {
GenericBlockingStub stub =
GenericGrpc.newBlockingStub(grpcChannelHandler.get(endpoint))
.withDeadlineAfter(grpcReadTimeout, TimeUnit.MILLISECONDS)
.withMaxInboundMessageSize(grpcMaxMessageSize)
.withMaxOutboundMessageSize(grpcMaxMessageSize);
return stub.aggregate(outputsList);
try {
GenericBlockingStub stub =
GenericGrpc.newBlockingStub(grpcChannelHandler.get(endpoint))
.withDeadlineAfter(grpcReadTimeout, TimeUnit.MILLISECONDS)
.withMaxInboundMessageSize(grpcMaxMessageSize)
.withMaxOutboundMessageSize(grpcMaxMessageSize);
return stub.aggregate(outputsList);
} catch (Exception e) {
logger.error("grpc exception genericStub aggregate", e);
throw e;
}
} else {
CombinerBlockingStub stub =
CombinerGrpc.newBlockingStub(grpcChannelHandler.get(endpoint))
.withDeadlineAfter(grpcReadTimeout, TimeUnit.MILLISECONDS)
.withMaxInboundMessageSize(grpcMaxMessageSize)
.withMaxOutboundMessageSize(grpcMaxMessageSize);
return stub.aggregate(outputsList);
try {
CombinerBlockingStub stub =
CombinerGrpc.newBlockingStub(grpcChannelHandler.get(endpoint))
.withDeadlineAfter(grpcReadTimeout, TimeUnit.MILLISECONDS)
.withMaxInboundMessageSize(grpcMaxMessageSize)
.withMaxOutboundMessageSize(grpcMaxMessageSize);
return stub.aggregate(outputsList);
} catch (Exception e) {
logger.error("grpc exception combinerStub aggregate", e);
throw e;
}
}
}
throw new APIException(
Expand Down
6 changes: 6 additions & 0 deletions testing/scripts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ To get everything setup run:
kind_test_setup.sh
```

Activate kind kubernetes config:

```
export KUBECONFIG="$(kind get kubeconfig-path)"
```

Then to run the tests:

```
Expand Down