From 7b836b27e0067a8796801e32093a7bda5a31341e Mon Sep 17 00:00:00 2001 From: wujimin Date: Sat, 30 May 2020 17:29:26 +0800 Subject: [PATCH] [SCB-1962] add highway server codec filter --- .../highway/HighwayFilterProvider.java | 32 ++++ .../highway/HighwayServerCodecFilter.java | 85 +++++++++ ...che.servicecomb.core.filter.FilterProvider | 18 ++ .../highway/HighwayServerCodecFilterTest.java | 175 ++++++++++++++++++ 4 files changed, 310 insertions(+) create mode 100644 transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayFilterProvider.java create mode 100644 transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerCodecFilter.java create mode 100644 transports/transport-highway/src/main/resources/META-INF/services/org.apache.servicecomb.core.filter.FilterProvider create mode 100644 transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/HighwayServerCodecFilterTest.java diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayFilterProvider.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayFilterProvider.java new file mode 100644 index 0000000000..ee2ea4d295 --- /dev/null +++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayFilterProvider.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.transport.highway; + +import java.util.Arrays; +import java.util.List; + +import org.apache.servicecomb.core.filter.Filter; +import org.apache.servicecomb.core.filter.FilterProvider; + +public class HighwayFilterProvider implements FilterProvider { + @Override + public List> getFilters() { + return Arrays.asList( + HighwayServerCodecFilter.class + ); + } +} diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerCodecFilter.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerCodecFilter.java new file mode 100644 index 0000000000..7f8cc8a99d --- /dev/null +++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerCodecFilter.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.transport.highway; + +import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; +import static org.apache.servicecomb.core.exception.Exceptions.exceptionToResponse; +import static org.apache.servicecomb.swagger.invocation.InvocationType.PRODUCER; + +import java.util.concurrent.CompletableFuture; + +import org.apache.servicecomb.codec.protobuf.definition.OperationProtobuf; +import org.apache.servicecomb.codec.protobuf.definition.ResponseRootSerializer; +import org.apache.servicecomb.core.Invocation; +import org.apache.servicecomb.core.filter.Filter; +import org.apache.servicecomb.core.filter.FilterMeta; +import org.apache.servicecomb.core.filter.FilterNode; +import org.apache.servicecomb.foundation.common.utils.AsyncUtils; +import org.apache.servicecomb.swagger.invocation.Response; +import org.apache.servicecomb.transport.highway.message.ResponseHeader; + +import io.vertx.core.buffer.Buffer; + +@FilterMeta(name = "highway-server-codec", invocationType = PRODUCER) +public class HighwayServerCodecFilter implements Filter { + @Override + public CompletableFuture onFilter(Invocation invocation, FilterNode nextNode) { + return CompletableFuture.completedFuture(invocation) + .thenCompose(this::decodeRequest) + .thenCompose(nextNode::onFilter) + .exceptionally(exception -> exceptionToResponse(invocation, exception, INTERNAL_SERVER_ERROR)) + .thenCompose(response -> encodeResponse(invocation, response)); + } + + private CompletableFuture decodeRequest(Invocation invocation) { + HighwayTransportContext transportContext = invocation.getTransportContext(); + try { + HighwayCodec.decodeRequest(invocation, + transportContext.getHeader(), + transportContext.getOperationProtobuf(), + transportContext.getBodyBuffer()); + return CompletableFuture.completedFuture(invocation); + } catch (Exception e) { + return AsyncUtils.completeExceptionally(e); + } + } + + private CompletableFuture encodeResponse(Invocation invocation, Response response) { + ResponseHeader header = new ResponseHeader(); + header.setStatusCode(response.getStatusCode()); + header.setReasonPhrase(response.getReasonPhrase()); + header.setContext(invocation.getContext()); + header.setHeaders(response.getHeaders()); + + HighwayTransportContext transportContext = invocation.getTransportContext(); + long msgId = transportContext.getMsgId(); + OperationProtobuf operationProtobuf = transportContext.getOperationProtobuf(); + ResponseRootSerializer bodySchema = operationProtobuf.findResponseRootSerializer(response.getStatusCode()); + + try { + Buffer respBuffer = HighwayCodec.encodeResponse( + msgId, header, bodySchema, response.getResult()); + transportContext.setResponseBuffer(respBuffer); + + return CompletableFuture.completedFuture(response); + } catch (Exception e) { + // keep highway performance and simple, this encoding/decoding error not need handle by client + String msg = String.format("encode response failed, msgId=%d", msgId); + return AsyncUtils.completeExceptionally(new IllegalStateException(msg, e)); + } + } +} diff --git a/transports/transport-highway/src/main/resources/META-INF/services/org.apache.servicecomb.core.filter.FilterProvider b/transports/transport-highway/src/main/resources/META-INF/services/org.apache.servicecomb.core.filter.FilterProvider new file mode 100644 index 0000000000..950dcb7d28 --- /dev/null +++ b/transports/transport-highway/src/main/resources/META-INF/services/org.apache.servicecomb.core.filter.FilterProvider @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.servicecomb.transport.highway.HighwayFilterProvider \ No newline at end of file diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/HighwayServerCodecFilterTest.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/HighwayServerCodecFilterTest.java new file mode 100644 index 0000000000..70cfaaa4b4 --- /dev/null +++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/HighwayServerCodecFilterTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.transport.highway; + +import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.apache.servicecomb.codec.protobuf.definition.OperationProtobuf; +import org.apache.servicecomb.config.ConfigUtil; +import org.apache.servicecomb.core.Endpoint; +import org.apache.servicecomb.core.Invocation; +import org.apache.servicecomb.core.SCBEngine; +import org.apache.servicecomb.core.SCBStatus; +import org.apache.servicecomb.core.bootstrap.SCBBootstrap; +import org.apache.servicecomb.core.definition.OperationMeta; +import org.apache.servicecomb.core.filter.FilterNode; +import org.apache.servicecomb.core.invocation.InvocationFactory; +import org.apache.servicecomb.foundation.test.scaffolding.config.ArchaiusUtils; +import org.apache.servicecomb.foundation.test.scaffolding.exception.RuntimeExceptionWithoutStackTrace; +import org.apache.servicecomb.swagger.invocation.Response; +import org.apache.servicecomb.swagger.invocation.response.Headers; +import org.apache.servicecomb.transport.highway.message.RequestHeader; +import org.apache.servicecomb.transport.highway.message.ResponseHeader; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.json.Json; +import mockit.Expectations; +import mockit.Mocked; +import mockit.Verifications; + +public class HighwayServerCodecFilterTest { + HighwayServerCodecFilter codecFilter = new HighwayServerCodecFilter(); + + Invocation invocation; + + @Mocked + Endpoint endpoint; + + @Mocked + OperationMeta operationMeta; + + @Mocked + HighwayTransportContext transportContext; + + Headers headers = new Headers(); + + FilterNode nextNode = new FilterNode((invocation, next) -> { + Response response = Response.ok("ok"); + response.setHeaders(headers); + return CompletableFuture.completedFuture(response); + }); + + static SCBEngine engine; + + @BeforeClass + public static void beforeClass() { + ArchaiusUtils.resetConfig(); + ConfigUtil.installDynamicConfig(); + + engine = SCBBootstrap.createSCBEngineForTest(); + engine.setStatus(SCBStatus.UP); + } + + @AfterClass + public static void afterClass() { + engine.destroy(); + ArchaiusUtils.resetConfig(); + } + + @Before + public void setUp() { + invocation = InvocationFactory.forProvider(endpoint, operationMeta, null); + } + + private void mockDecodeRequestFail() throws Exception { + new Expectations(invocation) { + { + invocation.getTransportContext(); + result = transportContext; + } + }; + new Expectations(HighwayCodec.class) { + { + HighwayCodec.decodeRequest(invocation, (RequestHeader) any, (OperationProtobuf) any, (Buffer) any); + result = new RuntimeExceptionWithoutStackTrace("encode request failed"); + } + }; + } + + @Test + public void should_not_invoke_filter_when_decode_request_failed(@Mocked FilterNode nextNode) throws Exception { + mockDecodeRequestFail(); + + codecFilter.onFilter(invocation, nextNode); + + new Verifications() { + { + nextNode.onFilter(invocation); + times = 0; + } + }; + } + + @Test + public void should_convert_exception_to_response_when_decode_request_failed() + throws Exception { + mockDecodeRequestFail(); + + Response response = codecFilter.onFilter(invocation, nextNode).get(); + + assertThat(response.getStatus()).isEqualTo(INTERNAL_SERVER_ERROR); + assertThat(Json.encode(response.getResult())) + .isEqualTo("{\"code\":\"SCB.5000\",\"message\":\"encode request failed\"}"); + } + + private void success_invocation() throws InterruptedException, ExecutionException { + new Expectations(invocation) { + { + invocation.getTransportContext(); + result = transportContext; + } + }; + + codecFilter.onFilter(invocation, nextNode).get(); + } + + @Test + public void should_encode_response_header(@Mocked ResponseHeader responseHeader) + throws ExecutionException, InterruptedException { + success_invocation(); + + new Verifications() { + { + Headers captureHeaders; + responseHeader.setHeaders(captureHeaders = withCapture()); + assertThat(captureHeaders).isSameAs(headers); + } + }; + } + + @Test + public void should_encode_response() throws ExecutionException, InterruptedException { + success_invocation(); + + new Verifications() { + { + Buffer captureBuffer; + transportContext.setResponseBuffer(captureBuffer = withCapture()); + assertThat(captureBuffer).isNotNull(); + } + }; + } +} \ No newline at end of file