Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SCB-486] Edge route download #667

Merged
merged 9 commits into from
Apr 22, 2018
Expand Up @@ -54,4 +54,6 @@ private RestConst() {
public static final String REST_REQUEST = "servicecomb-rest-request";

public static final String CONSUMER_HEADER = "servicecomb-rest-consumer-header";

public static final String READ_STREAM_PART = "servicecomb-readStreamPart";
}
Expand Up @@ -17,18 +17,39 @@

package org.apache.servicecomb.demo.edge.business;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.charset.StandardCharsets;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.servicecomb.demo.edge.model.AppClientDataRsp;
import org.apache.servicecomb.demo.edge.model.ChannelRequestBase;
import org.apache.servicecomb.demo.edge.model.ResultWithInstance;
import org.apache.servicecomb.provider.rest.common.RestSchema;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;

import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;

@RestSchema(schemaId = "news-v2")
@RequestMapping(path = "/business/v2")
public class Impl {
File tempDir = new File("target/downloadTemp");

public Impl() throws IOException {
FileUtils.forceMkdir(tempDir);
}

@RequestMapping(path = "/channel/news/subscribe", method = RequestMethod.POST)
public AppClientDataRsp subscribeNewsColumn(@RequestBody ChannelRequestBase request) {
AppClientDataRsp response = new AppClientDataRsp();
Expand All @@ -46,4 +67,30 @@ public ResultWithInstance add(int x, int y) {
public ResultWithInstance dec(int x, int y) {
return ResultWithInstance.create(x - y);
}

@GetMapping(path = "/download")
@ApiResponses({
@ApiResponse(code = 200, response = File.class, message = ""),
})
public ResponseEntity<InputStream> download() throws IOException {
return ResponseEntity
.ok()
.header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_PLAIN_VALUE)
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment;filename=download.txt")
.body(new ByteArrayInputStream("download".getBytes(StandardCharsets.UTF_8)));
}

protected File createBigFile() throws IOException {
File file = new File(tempDir, "bigFile.txt");
file.delete();
RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
randomAccessFile.setLength(10 * 1024 * 1024);
randomAccessFile.close();
return file;
}

@GetMapping(path = "/bigFile")
public File bigFile() throws IOException {
return createBigFile();
}
}
Expand Up @@ -17,11 +17,13 @@

package org.apache.servicecomb.demo.edge.consumer;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.servicecomb.core.Endpoint;
import org.apache.servicecomb.core.endpoint.EndpointsCache;
Expand All @@ -34,6 +36,7 @@
import org.apache.servicecomb.serviceregistry.api.registry.Microservice;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -75,6 +78,9 @@ public Consumer() {
public void run() {
prepareEdge();

testDownload();
testDownloadBigFile();

invoke("/v1/add", 2, 1, addV1Result);
invoke("/v1/add", 3, 1, addV1Result);
invoke("/v1/add", 4, 1, addV1Result);
Expand All @@ -100,6 +106,36 @@ public void run() {
checkResult("v2/dec", decV2Result, "2.0.0");
}

protected void testDownloadBigFile() {
String url = edgePrefix + "/v2/bigFile";
AtomicInteger size = new AtomicInteger();

template.execute(url, HttpMethod.GET, req -> {
}, resp -> {
byte[] buf = new byte[1 * 1024 * 1024];
try (InputStream is = resp.getBody()) {
for (;;) {
int len = is.read(buf);
if (len == -1) {
break;
}

size.addAndGet(len);
}
}
return null;
});
Assert.isTrue(size.get() == 10 * 1024 * 1024);
System.out.println("test download bigFile finished");
}

protected void testDownload() {
String url = edgePrefix + "/v2/download";
String content = template.getForObject(url, String.class);
Assert.isTrue("download".equals(content));
System.out.println("test download finished");
}

private void checkResult(String name, List<ResultWithInstance> results, String... expectedVersions) {
Set<String> versions = new HashSet<>();
Set<String> remained = new HashSet<>(Arrays.asList(expectedVersions));
Expand Down
Expand Up @@ -40,7 +40,7 @@ public static void init(Vertx vertx) throws InterruptedException {
.setHost(PerfConfiguration.redisHost)
.setPort(PerfConfiguration.redisPort)
.setAuth(PerfConfiguration.redisPassword);
ClientPoolFactory<RedisClient> factory = () -> {
ClientPoolFactory<RedisClient> factory = (ctx) -> {
return RedisClient.create(vertx, redisOptions);
};
clientMgr = new ClientPoolManager<>(vertx, factory);
Expand Down
Expand Up @@ -20,6 +20,8 @@
import org.apache.commons.configuration.Configuration;
import org.apache.servicecomb.core.BootListener;
import org.apache.servicecomb.core.executor.ExecutorManager;
import org.apache.servicecomb.transport.rest.client.TransportClientConfig;
import org.apache.servicecomb.transport.rest.vertx.TransportConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
Expand All @@ -36,6 +38,9 @@ public void onBootEvent(BootEvent event) {
return;
}

TransportClientConfig.setRestTransportClientCls(EdgeRestTransportClient.class);
TransportConfig.setRestServerVerticle(EdgeRestServerVerticle.class);

String defaultExecutor = DynamicPropertyFactory.getInstance()
.getStringProperty(ExecutorManager.KEY_EXECUTORS_DEFAULT, null)
.get();
Expand Down
Expand Up @@ -36,9 +36,12 @@
import org.apache.servicecomb.serviceregistry.consumer.MicroserviceVersionRule;
import org.apache.servicecomb.serviceregistry.definition.DefinitionConst;

import io.vertx.core.Vertx;
import io.vertx.ext.web.RoutingContext;

public class EdgeInvocation extends AbstractRestInvocation {
public static final String EDGE_INVOCATION_CONTEXT = "edgeInvocationContext";

protected String microserviceName;

protected MicroserviceVersionRule microserviceVersionRule;
Expand Down Expand Up @@ -121,6 +124,8 @@ protected void createInvocation() {
this.invocation = InvocationFactory.forConsumer(referenceConfig,
restOperationMeta.getOperationMeta(),
null);
this.invocation.setSync(false);
this.invocation.getHandlerContext().put(EDGE_INVOCATION_CONTEXT, Vertx.currentContext());
this.invocation.setResponseExecutor(new ReactiveResponseExecutor());
}
}
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.edge.core;

import org.apache.servicecomb.transport.rest.client.RestTransportClient;
import org.apache.servicecomb.transport.rest.vertx.RestServerVerticle;

public class EdgeRestServerVerticle extends RestServerVerticle {
@Override
public void start() throws Exception {
super.start();

RestTransportClient restClient = (RestTransportClient) config().getValue(RestTransportClient.class.getName());
restClient.getClientMgr().findClientPool(false, context);
}
}
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.edge.core;

import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.foundation.vertx.client.http.HttpClientWithContext;
import org.apache.servicecomb.transport.rest.client.RestTransportClient;

import io.vertx.core.Context;

public class EdgeRestTransportClient extends RestTransportClient {
@Override
protected HttpClientWithContext findHttpClientPool(Invocation invocation) {
Context invocationContext = (Context) invocation.getHandlerContext().get(EdgeInvocation.EDGE_INVOCATION_CONTEXT);
return clientMgr.findClientPool(invocation.isSync(), invocationContext);
}
}
Expand Up @@ -227,5 +227,7 @@ public void createInvocation(@Mocked MicroserviceVersionMeta microserviceVersion
edgeInvocation.createInvocation();
Invocation invocation = Deencapsulation.getField(edgeInvocation, "invocation");
Assert.assertThat(invocation.getResponseExecutor(), Matchers.instanceOf(ReactiveResponseExecutor.class));
Assert.assertFalse(invocation.isSync());
Assert.assertSame(context, invocation.getHandlerContext().get(EdgeInvocation.EDGE_INVOCATION_CONTEXT));
}
}
Expand Up @@ -21,6 +21,7 @@

import org.apache.log4j.Appender;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;

Expand Down Expand Up @@ -48,6 +49,11 @@ public LogCollector() {
Logger.getRootLogger().addAppender(appender);
}

public LogCollector setLogLevel(String logName, Level level) {
Logger.getLogger(logName).setLevel(level);
return this;
}

public List<LoggingEvent> getEvents() {
return events;
}
Expand Down
Expand Up @@ -37,6 +37,7 @@
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
Expand Down Expand Up @@ -88,7 +89,7 @@ public static <CLIENT_POOL> DeploymentOptions createClientDeployOptions(
}

// deploy Verticle and wait for its success. do not call this method in event-loop thread
public static <VERTICLE extends AbstractVerticle> boolean blockDeploy(Vertx vertx,
public static <VERTICLE extends Verticle> boolean blockDeploy(Vertx vertx,
Class<VERTICLE> cls,
DeploymentOptions options) throws InterruptedException {
Holder<Boolean> result = new Holder<>();
Expand Down
Expand Up @@ -17,6 +17,8 @@

package org.apache.servicecomb.foundation.vertx.client;

import io.vertx.core.Context;

public interface ClientPoolFactory<CLIENT_POOL> {
CLIENT_POOL createClientPool();
CLIENT_POOL createClientPool(Context context);
}
Expand Up @@ -56,28 +56,36 @@ public ClientPoolManager(Vertx vertx, ClientPoolFactory<CLIENT_POOL> factory) {
this.factory = factory;
}

public CLIENT_POOL createClientPool() {
CLIENT_POOL pool = factory.createClientPool();
addPool(pool);
public CLIENT_POOL createClientPool(Context context) {
CLIENT_POOL pool = factory.createClientPool(context);
addPool(context, pool);
return pool;
}

protected void addPool(CLIENT_POOL pool) {
Vertx.currentContext().put(id, pool);
protected void addPool(Context context, CLIENT_POOL pool) {
context.put(id, pool);
pools.add(pool);
}

public CLIENT_POOL findClientPool(boolean sync) {
return findClientPool(sync, null);
}

public CLIENT_POOL findClientPool(boolean sync, Context targetContext) {
if (sync) {
return findThreadBindClientPool();
}

// reactive mode
return findByContext();
return findByContext(targetContext);
}

protected CLIENT_POOL findByContext() {
Context currentContext = Vertx.currentContext();
return findByContext(null);
}

protected CLIENT_POOL findByContext(Context targetContext) {
Context currentContext = targetContext != null ? targetContext : Vertx.currentContext();
if (currentContext != null
&& currentContext.owner() == vertx
&& currentContext.isEventLoopContext()) {
Expand All @@ -89,7 +97,7 @@ protected CLIENT_POOL findByContext() {

// this will make "client.thread-count" bigger than which in microservice.yaml
// maybe it's better to remove "client.thread-count", just use "rest/highway.thread-count"
return createClientPool();
return createClientPool(currentContext);
}

// not in correct context:
Expand Down
Expand Up @@ -24,14 +24,15 @@

public class ClientVerticle<CLIENT_POOL> extends AbstractVerticle {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientVerticle.class);

public static final String CLIENT_MGR = "clientMgr";

@SuppressWarnings("unchecked")
@Override
public void start() throws Exception {
try {
ClientPoolManager<CLIENT_POOL> clientMgr = (ClientPoolManager<CLIENT_POOL>) config().getValue(CLIENT_MGR);
clientMgr.createClientPool();
clientMgr.createClientPool(context);
} catch (Throwable e) {
// vert.x got some states that not print error and execute call back in VertexUtils.blockDeploy, we add a log our self.
LOGGER.error("", e);
Expand Down