Skip to content

Commit

Permalink
fix: resolve failure in uploading files to a presigned AWS S3 upload …
Browse files Browse the repository at this point in the history
…url (#1645)

* bug: sending data to an AWS S3 presigned upload url does not work #1643

* fix checkstyle

* documentation

* apply review from @jimmarino

* fix checkstyle

* fix test

* apply review - rename to transferInOneGo and reverse logic

* fix test

* apply review from @jimmarino - rename to "transfer in one go" to "nonChunkedTransfer"

* apply review

* fix changelog for proper milestone

* fix NonChunkedTransferRequestBodyTest.java - use the new faker import

Co-authored-by: Andrei-Laurentiu Coman <andrei.coman@siemens.com>
  • Loading branch information
lucian-torje-siemens and andreic94 committed Aug 10, 2022
1 parent a82e288 commit e323243
Show file tree
Hide file tree
Showing 14 changed files with 277 additions and 8 deletions.
1 change: 1 addition & 0 deletions extensions/data-plane/data-plane-http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,4 @@ The table below summarizes how each parameter is retrieved for the source/sink i
| Method | `DataFlowRequest` properties if method proxy enabled by the source `DataAddress`, otherwise default to `GET` | Destination `DataAddress` if present, otherwise `POST` by default | GET, POST... |
| Content type | `DataFlowRequest` properties if body proxy enabled by the source `DataAddress` | Destination `DataAddress` | application/json |
| Body | `DataFlowRequest` properties if body proxy enabled by the source `DataAddress` | `Part` stream fetched by the `DataSource` | "hello world!" |
| NonChunkedTransfer | Not used | Destination `DataAddress` if present, otherwise `true` by default | "false" |
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
import java.util.function.Supplier;

/**
* Streams content into an OK HTTP buffered sink.
* Streams content into an OK HTTP buffered sink in chunks.
*
* Due to OkHttp implementation an extra header will be created (no-overridable) Transfer-Encoding with value chunked
*
* @see <a href="https://github.com/square/okhttp/blob/master/docs/features/calls.md">OkHttp Dcoumentation</a>
*/
public class StreamingRequestBody extends RequestBody {
public class ChunkedTransferRequestBody extends RequestBody {
private final Supplier<InputStream> bodySupplier;
private final String contentType;

public StreamingRequestBody(Supplier<InputStream> contentSupplier, String contentType) {
public ChunkedTransferRequestBody(Supplier<InputStream> contentSupplier, String contentType) {
this.bodySupplier = contentSupplier;
this.contentType = contentType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.eclipse.dataspaceconnector.dataplane.http.pipeline;

import okhttp3.Request;
import okhttp3.RequestBody;
import org.jetbrains.annotations.Nullable;

import java.io.InputStream;
Expand All @@ -26,13 +27,15 @@
public class HttpRequestParams {

private static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";
private static final boolean DEFAULT_NON_CHUNKED_TRANSFER = false;

private String method;
private String baseUrl;
private String path;
private String queryParams;
private String contentType = DEFAULT_CONTENT_TYPE;
private String body;
private boolean nonChunkedTransfer = DEFAULT_NON_CHUNKED_TRANSFER;
private final Map<String, String> headers = new HashMap<>();

/**
Expand All @@ -54,9 +57,7 @@ public Request toRequest() {
* @return HTTP request.
*/
public Request toRequest(@Nullable Supplier<InputStream> bodySupplier) {
var requestBody = (bodySupplier != null && contentType != null) ?
new StreamingRequestBody(bodySupplier, contentType) :
null;
var requestBody = createRequestBody(bodySupplier);

var requestBuilder = new Request.Builder()
.url(toUrl())
Expand All @@ -65,6 +66,16 @@ public Request toRequest(@Nullable Supplier<InputStream> bodySupplier) {
return requestBuilder.build();
}

private RequestBody createRequestBody(@Nullable Supplier<InputStream> bodySupplier) {
if (bodySupplier == null || contentType == null) {
return null;
}

return nonChunkedTransfer ?
new NonChunkedTransferRequestBody(bodySupplier, contentType) : new ChunkedTransferRequestBody(bodySupplier, contentType);
}


/**
* Creates a URL from the base url, path and query parameters provided in input.
*
Expand Down Expand Up @@ -128,6 +139,11 @@ public HttpRequestParams.Builder path(String path) {
return this;
}

public HttpRequestParams.Builder nonChunkedTransfer(boolean nonChunkedTransfer) {
params.nonChunkedTransfer = nonChunkedTransfer;
return this;
}

public HttpRequestParams build() {
params.headers.forEach((s, s2) -> Objects.requireNonNull(s2, "value for header: " + s));
Objects.requireNonNull(params.baseUrl, "baseUrl");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ public HttpRequestParams apply(DataFlowRequest request) {
params.contentType(ct);
params.body(extractBody(address, request));
});
params.nonChunkedTransfer(extractNonChunkedTransfer(address));

return params.build();
}

protected abstract boolean extractNonChunkedTransfer(HttpDataAddress address);

@NotNull
protected abstract DataAddress selectAddress(DataFlowRequest request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public HttpSinkRequestParamsSupplier(Vault vault) {
super(vault);
}

@Override
protected boolean extractNonChunkedTransfer(HttpDataAddress address) {
return address.getNonChunkedTransfer();
}

@Override
protected @NotNull DataAddress selectAddress(DataFlowRequest request) {
return request.getDestinationDataAddress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public HttpSourceRequestParamsSupplier(Vault vault) {
super(vault);
}

@Override
protected boolean extractNonChunkedTransfer(HttpDataAddress address) {
return false;
}

@Override
protected @NotNull DataAddress selectAddress(DataFlowRequest request) {
return request.getSourceDataAddress();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2021 Microsoft Corporation
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Microsoft Corporation - initial API and implementation
*
*/

package org.eclipse.dataspaceconnector.dataplane.http.pipeline;

import okhttp3.MediaType;
import okhttp3.RequestBody;
import okio.BufferedSink;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.io.InputStream;
import java.util.function.Supplier;

/**
* Writes content into an OK HTTP buffered sink.
*
* The extra Transfer-Encoding is not created because the Content-Length is provided upfront.
* Note that means that the all content is loaded into memory, so this method can be used for small files (up to 50MB) for e.g.
*
* @see <a href="https://github.com/square/okhttp/blob/master/docs/features/calls.md">OkHttp Dcoumentation</a>
*/
public class NonChunkedTransferRequestBody extends RequestBody {
private byte[] bytes;
private final String contentType;

public NonChunkedTransferRequestBody(Supplier<InputStream> contentSupplier, String contentType) {
try {
this.bytes = contentSupplier.get().readAllBytes();
} catch (IOException e) {
//do nothing
}
this.contentType = contentType;
}

@Override
public long contentLength() {
return bytes == null ? 0 : bytes.length;
}

@Override
public MediaType contentType() {
return MediaType.parse(contentType);
}

@Override
public void writeTo(@NotNull BufferedSink sink) throws IOException {
if (bytes == null) {
return;
}

try (var os = sink.outputStream()) {
os.write(bytes);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class StreamingRequestBodyTest {
class ChunkedTransferRequestBodyTest {
private static final Faker FAKER = new Faker();

@Test
Expand All @@ -38,7 +38,7 @@ void verifyStreamingTransfer() throws IOException {

when(sink.outputStream()).thenReturn(outputStream);

var body = new StreamingRequestBody(() -> new ByteArrayInputStream(content.getBytes()), HttpDataAddress.OCTET_STREAM);
var body = new ChunkedTransferRequestBody(() -> new ByteArrayInputStream(content.getBytes()), HttpDataAddress.OCTET_STREAM);
body.writeTo(sink);

assertThat(outputStream).hasToString(content);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*
* Contributors:
* Amadeus - initial API and implementation
* Siemens - add chunked parameter
*
*/

Expand Down Expand Up @@ -139,6 +140,26 @@ void verifyAbstractMethodsInvokation() throws IOException {
assertThat(body.contentType()).isEqualTo(MediaType.get(supplier.contentType));
assertThat(HttpTestFixtures.formatRequestBodyAsString(body)).isEqualTo(supplier.body);
assertThat(httpRequest.method()).isEqualTo(supplier.method);
assertThat(httpRequest.body().contentLength()).isEqualTo(-1L);
}

@Test
void verifyChunkedCall() throws IOException {
var dataAddress = HttpDataAddress.Builder.newInstance()
.baseUrl("http://" + FAKER.internet().url())
.build();
var request = createRequest(dataAddress);

var supplier = new TestHttpRequestParamsSupplier(vaultMock, true);
var httpRequest = supplier.apply(request).toRequest();

assertThat(httpRequest.url().url()).hasToString(dataAddress.getBaseUrl() + "/" + supplier.path + "?" + supplier.queryParams);
var body = httpRequest.body();
assertThat(body).isNotNull();
assertThat(body.contentType()).isEqualTo(MediaType.get(supplier.contentType));
assertThat(HttpTestFixtures.formatRequestBodyAsString(body)).isEqualTo(supplier.body);
assertThat(httpRequest.method()).isEqualTo(supplier.method);
assertThat(httpRequest.body().contentLength()).isEqualTo(supplier.body.getBytes().length);
}

private static DataFlowRequest createRequest(DataAddress source) {
Expand All @@ -156,16 +177,33 @@ public static final class TestHttpRequestParamsSupplier extends HttpRequestParam
private final String queryParams;
private final String contentType;
private final String body;
private final boolean isOneGo;

private TestHttpRequestParamsSupplier(Vault vault) {
super(vault);
this.method = new Random().nextBoolean() ? HttpMethod.PUT.name() : HttpMethod.POST.name();
this.isOneGo = false;
this.path = FAKER.lorem().word();
this.queryParams = FAKER.lorem().word();
this.contentType = new Random().nextBoolean() ? APPLICATION_JSON : APPLICATION_X_WWW_FORM_URLENCODED;
this.body = FAKER.lorem().word();
}

private TestHttpRequestParamsSupplier(Vault vault, boolean isOneGo) {
super(vault);
this.method = new Random().nextBoolean() ? HttpMethod.PUT.name() : HttpMethod.POST.name();
this.isOneGo = isOneGo;
this.path = FAKER.lorem().word();
this.queryParams = FAKER.lorem().word();
this.contentType = new Random().nextBoolean() ? APPLICATION_JSON : APPLICATION_X_WWW_FORM_URLENCODED;
this.body = FAKER.lorem().word();
}

@Override
protected boolean extractNonChunkedTransfer(HttpDataAddress address) {
return isOneGo;
}

@Override
protected @NotNull DataAddress selectAddress(DataFlowRequest request) {
return request.getSourceDataAddress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Random;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -106,4 +108,16 @@ void extractContentType() {
void extractBody() {
assertThat(supplier.extractPath(mock(HttpDataAddress.class), null)).isNull();
}

@Test
void extractNonChunkedTransfer() {
var chunked = new Random().nextBoolean();
var address = HttpDataAddress.Builder.newInstance()
.nonChunkedTransfer(chunked)
.build();

var result = supplier.extractNonChunkedTransfer(address);

assertThat(result).isEqualTo(chunked);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.junit.jupiter.api.Test;

import java.util.Map;
import java.util.Random;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
Expand Down Expand Up @@ -230,6 +231,18 @@ void extractBodyFilteredByProxy() {
assertThat(result).isNull();
}

@Test
void extractNonChunkedTransfer() {
var chunked = new Random().nextBoolean();
var address = HttpDataAddress.Builder.newInstance()
.nonChunkedTransfer(chunked)
.build();

var result = supplier.extractNonChunkedTransfer(address);

assertThat(result).isFalse();
}

private static DataFlowRequest createRequest(DataAddress source) {
return createRequest(source, Map.of());
}
Expand Down
Loading

0 comments on commit e323243

Please sign in to comment.