Skip to content

Commit

Permalink
SCB-2102 Fix the problem of unable to return due to abnormal file upl…
Browse files Browse the repository at this point in the history
…oad (#2017)
  • Loading branch information
develpoerX committed Oct 26, 2020
1 parent 2ea1340 commit 5616701
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 10 deletions.
Expand Up @@ -44,6 +44,7 @@
import com.google.common.collect.Multimap;

import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
Expand All @@ -66,10 +67,18 @@ public class RestClientRequestImpl implements RestClientRequest {

protected Buffer bodyBuffer;

private Handler<Throwable> throwableHandler;

public RestClientRequestImpl(HttpClientRequest request, Context context, AsyncResponse asyncResp) {
this(request, context, asyncResp, null);
}

public RestClientRequestImpl(HttpClientRequest request, Context context, AsyncResponse asyncResp,
Handler<Throwable> throwableHandler) {
this.context = context;
this.asyncResp = asyncResp;
this.request = request;
this.throwableHandler = throwableHandler;
}

@Override
Expand Down Expand Up @@ -186,7 +195,7 @@ private void attachFile(String boundary, Iterator<Entry<String, Part>> uploadsIt
Buffer fileHeader = fileBoundaryInfo(boundary, name, part);
request.write(fileHeader);

new PumpFromPart(context, part).toWriteStream(request).whenComplete((v, e) -> {
new PumpFromPart(context, part).toWriteStream(request, throwableHandler).whenComplete((v, e) -> {
if (e != null) {
LOGGER.debug("Failed to sending file [{}:{}].", name, filename, e);
asyncResp.consumerFail(e);
Expand Down
Expand Up @@ -91,7 +91,7 @@ public ReadStream<Buffer> getReadStream() {
* so the return future only means finished read from readStream.
*/
public CompletableFuture<Void> saveToWriteStream(WriteStream<Buffer> writeStream) {
return new PumpCommon().pump(context, readStream, writeStream);
return new PumpCommon().pump(context, readStream, writeStream, null);
}

public CompletableFuture<byte[]> saveAsBytes() {
Expand Down
Expand Up @@ -126,7 +126,7 @@ public void internalFlushBuffer() {
public CompletableFuture<Void> sendPart(Part part) {
DownloadUtils.prepareDownloadHeader(this, part);

return new PumpFromPart(context, part).toWriteStream(serverResponse);
return new PumpFromPart(context, part).toWriteStream(serverResponse, null);
}

@Override
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.servicecomb.foundation.common.io.AsyncCloseable;

import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.streams.Pump;
Expand All @@ -39,7 +40,8 @@ public class PumpCommon {
* <p> if writeStream is not AsyncCloseable, future only means read complete
*/
@SuppressWarnings("unchecked")
public CompletableFuture<Void> pump(Context context, ReadStream<Buffer> readStream, WriteStream<Buffer> writeStream) {
public CompletableFuture<Void> pump(Context context, ReadStream<Buffer> readStream, WriteStream<Buffer> writeStream,
Handler<Throwable> throwableHandler) {
CompletableFuture<Void> readFuture = new CompletableFuture<>();

writeStream.exceptionHandler(e -> {
Expand All @@ -54,6 +56,9 @@ public CompletableFuture<Void> pump(Context context, ReadStream<Buffer> readStre
// so can only close the connection.
((HttpClientResponse) readStream).request().connection().close();
}
if (throwableHandler != null) {
throwableHandler.handle(e);
}
readFuture.completeExceptionally(e);
});
readStream.exceptionHandler(readFuture::completeExceptionally);
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.servicecomb.foundation.vertx.http.ReadStreamPart;

import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
Expand Down Expand Up @@ -62,9 +63,9 @@ private CompletableFuture<ReadStream<Buffer>> prepareReadStream() {
return future;
}

public CompletableFuture<Void> toWriteStream(WriteStream<Buffer> writeStream) {
public CompletableFuture<Void> toWriteStream(WriteStream<Buffer> writeStream, Handler<Throwable> throwableHandler) {
return prepareReadStream()
.thenCompose(readStream -> new PumpCommon().pump(context, readStream, writeStream))
.thenCompose(readStream -> new PumpCommon().pump(context, readStream, writeStream, throwableHandler))
.whenComplete((v, e) -> DownloadUtils.clearPartResource(part));
}

Expand All @@ -79,7 +80,7 @@ public CompletableFuture<Void> toOutputStream(OutputStream outputStream, boolean
private CompletableFuture<Void> toOutputStreamAsync(OutputStream outputStream, boolean autoCloseOutputStream) {
OutputStreamToWriteStream outputStreamToWriteStream = new OutputStreamToWriteStream(context, outputStream,
autoCloseOutputStream);
return toWriteStream(outputStreamToWriteStream);
return toWriteStream(outputStreamToWriteStream, null);
}

// DO NOT use a mocked sync context to unify the pump logic
Expand Down
Expand Up @@ -372,7 +372,7 @@ public void sendPart_ReadStreamPart(@Mocked ReadStreamPart part) {
CompletableFuture<Void> future = new CompletableFuture<>();
new MockUp<PumpFromPart>() {
@Mock
CompletableFuture<Void> toWriteStream(WriteStream<Buffer> writeStream) {
CompletableFuture<Void> toWriteStream(WriteStream<Buffer> writeStream, Handler<Throwable> throwableHandler) {
return future;
}
};
Expand Down
Expand Up @@ -47,6 +47,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
Expand Down Expand Up @@ -76,6 +77,8 @@ public class RestClientInvocation {

private HttpClientResponse clientResponse;

private Handler<Throwable> throwableHandler = e -> fail((ConnectionBase) clientRequest.connection(), e);

public RestClientInvocation(HttpClientWithContext httpClientWithContext, List<HttpClientFilter> httpClientFilters) {
this.httpClientWithContext = httpClientWithContext;
this.httpClientFilters = httpClientFilters;
Expand All @@ -94,7 +97,7 @@ public void invoke(Invocation invocation, AsyncResponse asyncResp) throws Except
createRequest(ipPort, path);
clientRequest.putHeader(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE, invocation.getMicroserviceName());
RestClientRequestImpl restClientRequest =
new RestClientRequestImpl(clientRequest, httpClientWithContext.context(), asyncResp);
new RestClientRequestImpl(clientRequest, httpClientWithContext.context(), asyncResp, throwableHandler);
invocation.getHandlerContext().put(RestConst.INVOCATION_HANDLER_REQUESTCLIENT, restClientRequest);

Buffer requestBodyBuffer = restClientRequest.getBodyBuffer();
Expand All @@ -109,7 +112,7 @@ public void invoke(Invocation invocation, AsyncResponse asyncResp) throws Except
clientRequest.exceptionHandler(e -> {
LOGGER.error(invocation.getMarker(), "Failed to send request, local:{}, remote:{}.",
getLocalAddress(), ipPort.getSocketAddress(), e);
fail((ConnectionBase) clientRequest.connection(), e);
throwableHandler.handle(e);
});
clientRequest.connectionHandler(connection -> {
LOGGER.debug("http connection connected, local:{}, remote:{}.",
Expand Down

0 comments on commit 5616701

Please sign in to comment.