Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: resolve failure in uploading files to a presigned AWS S3 upload url #1645

Merged
merged 26 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8b53aea
Merge pull request #16 from eclipse-dataspaceconnector/main
lucian-torje-siemens Jun 21, 2022
015823f
Merge branch 'eclipse-dataspaceconnector:main' into upstream
lucian-torje-siemens Jun 27, 2022
940c9c9
Merge branch 'eclipse-dataspaceconnector:main' into upstream
lucian-torje-siemens Jun 27, 2022
757dbc3
Merge branch 'eclipse-dataspaceconnector:main' into upstream
lucian-torje-siemens Jun 28, 2022
2905c7c
Merge branch 'eclipse-dataspaceconnector:main' into upstream
lucian-torje-siemens Jul 1, 2022
44703d2
Merge branch 'eclipse-dataspaceconnector:main' into upstream
lucian-torje-siemens Jul 4, 2022
7b8e61f
Merge branch 'eclipse-dataspaceconnector:main' into upstream
lucian-torje-siemens Jul 4, 2022
618dd15
bug: sending data to an AWS S3 presigned upload url does not work #1643
lucian-torje-siemens Jul 11, 2022
c8eabc5
fix checkstyle
lucian-torje-siemens Jul 11, 2022
3699bbf
documentation
lucian-torje-siemens Jul 11, 2022
619ac82
apply review from @jimmarino
lucian-torje-siemens Jul 12, 2022
568e620
fix checkstyle
lucian-torje-siemens Jul 12, 2022
f2cc092
fix test
lucian-torje-siemens Jul 12, 2022
5b11fe8
apply review - rename to transferInOneGo and reverse logic
lucian-torje-siemens Jul 12, 2022
bf34714
fix test
lucian-torje-siemens Jul 12, 2022
e764e52
apply review from @jimmarino - rename to "transfer in one go" to "non…
lucian-torje-siemens Jul 15, 2022
47177a4
Merge branch 'main' into bug/sink-aws-s3
lucian-torje-siemens Jul 15, 2022
2905612
apply review
lucian-torje-siemens Jul 15, 2022
d8bdd28
Merge branch 'main' of https://github.com/eclipse-dataspaceconnector/…
andreic94 Jul 22, 2022
5fb8414
Merge branch 'upstream' of https://github.com/mindsphere/DataSpaceCon…
andreic94 Jul 22, 2022
df78f94
fix changelog for proper milestone
andreic94 Jul 22, 2022
b4939b9
Merge branch 'main' of https://github.com/eclipse-dataspaceconnector/…
andreic94 Jul 26, 2022
c4f9bea
Merge branch 'main' of https://github.com/eclipse-dataspaceconnector/…
andreic94 Jul 26, 2022
7570340
Merge branch 'eclipse-dataspaceconnector:main' into upstream
lucian-torje-siemens Aug 3, 2022
dda818a
Merge branch 'upstream' into bug/sink-aws-s3
lucian-torje-siemens Aug 3, 2022
885dd54
fix NonChunkedTransferRequestBodyTest.java - use the new faker import
lucian-torje-siemens Aug 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ in the detailed section referring to by linking pull requests or issues.
* Fix race condition in `ContractNegotiationIntegrationTest` (#1505)
* Fix for change in Cosmos DB behavior on missing sort fields (#1514)
* Effectively removed default LIMIT in SQL Contract Def Store (#1515)
* Sending data to an AWS S3 presigned upload url does not work (#1643)
lucian-torje-siemens marked this conversation as resolved.
Show resolved Hide resolved

## [milestone-4] - 2022-06-07

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
/**
* Streams content into an OK HTTP buffered sink.
lucian-torje-siemens marked this conversation as resolved.
Show resolved Hide resolved
*/
public class StreamingRequestBody extends RequestBody {
public class ChunkedTransferRequestBody extends RequestBody {
private final Supplier<InputStream> bodySupplier;
private final String contentType;

public StreamingRequestBody(Supplier<InputStream> contentSupplier, String contentType) {
public ChunkedTransferRequestBody(Supplier<InputStream> contentSupplier, String contentType) {
this.bodySupplier = contentSupplier;
this.contentType = contentType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.eclipse.dataspaceconnector.dataplane.http.pipeline;

import okhttp3.Request;
import okhttp3.RequestBody;
import org.jetbrains.annotations.Nullable;

import java.io.InputStream;
Expand All @@ -26,13 +27,15 @@
public class HttpRequestParams {

private static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";
private static final Boolean DEFAULT_TRANSFER_CHUNKED = Boolean.TRUE;
lucian-torje-siemens marked this conversation as resolved.
Show resolved Hide resolved

private String method;
private String baseUrl;
private String path;
private String queryParams;
private String contentType = DEFAULT_CONTENT_TYPE;
private String body;
private Boolean transferChuncked = DEFAULT_TRANSFER_CHUNKED;
lucian-torje-siemens marked this conversation as resolved.
Show resolved Hide resolved
private final Map<String, String> headers = new HashMap<>();

/**
Expand All @@ -54,9 +57,7 @@ public Request toRequest() {
* @return HTTP request.
*/
public Request toRequest(@Nullable Supplier<InputStream> bodySupplier) {
var requestBody = (bodySupplier != null && contentType != null) ?
new StreamingRequestBody(bodySupplier, contentType) :
null;
var requestBody = createRequestBody(bodySupplier);

var requestBuilder = new Request.Builder()
.url(toUrl())
Expand All @@ -65,6 +66,16 @@ public Request toRequest(@Nullable Supplier<InputStream> bodySupplier) {
return requestBuilder.build();
}

private RequestBody createRequestBody(@Nullable Supplier<InputStream> bodySupplier) {
if (bodySupplier == null || contentType == null) {
return null;
}

return transferChuncked ?
new ChunkedTransferRequestBody(bodySupplier, contentType) : new NonChunkedTransferRequestBody(bodySupplier, contentType);
}


/**
* Creates a URL from the base url, path and query parameters provided in input.
*
Expand Down Expand Up @@ -128,6 +139,11 @@ public HttpRequestParams.Builder path(String path) {
return this;
}

public HttpRequestParams.Builder transferChuncked(Boolean transferChuncked) {
lucian-torje-siemens marked this conversation as resolved.
Show resolved Hide resolved
params.transferChuncked = transferChuncked;
return this;
}

public HttpRequestParams build() {
params.headers.forEach((s, s2) -> Objects.requireNonNull(s2, "value for header: " + s));
Objects.requireNonNull(params.baseUrl, "baseUrl");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ public HttpRequestParams apply(DataFlowRequest request) {
params.contentType(ct);
params.body(extractBody(address, request));
});
Optional.ofNullable(extractTransferChunked(address)).ifPresent(params::transferChuncked);
lucian-torje-siemens marked this conversation as resolved.
Show resolved Hide resolved

return params.build();
}

protected abstract Boolean extractTransferChunked(HttpDataAddress address);
lucian-torje-siemens marked this conversation as resolved.
Show resolved Hide resolved

@NotNull
protected abstract DataAddress selectAddress(DataFlowRequest request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,17 @@
public class HttpSinkRequestParamsSupplier extends HttpRequestParamsSupplier {

private static final String DEFAULT_METHOD = "POST";
private static final Boolean DEFAULT_TRANSFER_CHUNKED = Boolean.TRUE;
lucian-torje-siemens marked this conversation as resolved.
Show resolved Hide resolved

public HttpSinkRequestParamsSupplier(Vault vault) {
super(vault);
}

@Override
protected Boolean extractTransferChunked(HttpDataAddress address) {
return Optional.ofNullable(address.getTransferChunked()).orElse(DEFAULT_TRANSFER_CHUNKED);
lucian-torje-siemens marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
protected @NotNull DataAddress selectAddress(DataFlowRequest request) {
return request.getDestinationDataAddress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public HttpSourceRequestParamsSupplier(Vault vault) {
super(vault);
}

@Override
protected Boolean extractTransferChunked(HttpDataAddress address) {
lucian-torje-siemens marked this conversation as resolved.
Show resolved Hide resolved
return null;
}

@Override
protected @NotNull DataAddress selectAddress(DataFlowRequest request) {
return request.getSourceDataAddress();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2021 Microsoft Corporation
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Microsoft Corporation - initial API and implementation
*
*/

package org.eclipse.dataspaceconnector.dataplane.http.pipeline;

import okhttp3.MediaType;
import okhttp3.RequestBody;
import okio.BufferedSink;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.io.InputStream;
import java.util.Optional;
import java.util.function.Supplier;

/**
* Streams content into an OK HTTP buffered sink.
lucian-torje-siemens marked this conversation as resolved.
Show resolved Hide resolved
*/
public class NonChunkedTransferRequestBody extends RequestBody {
private Optional<byte[]> bytes;
lucian-torje-siemens marked this conversation as resolved.
Show resolved Hide resolved
private final String contentType;

public NonChunkedTransferRequestBody(Supplier<InputStream> contentSupplier, String contentType) {
try {
this.bytes = Optional.of(contentSupplier.get().readAllBytes());
lucian-torje-siemens marked this conversation as resolved.
Show resolved Hide resolved
} catch (IOException e) {
this.bytes = Optional.empty();
}
this.contentType = contentType;
}

@Override
public long contentLength() {
return bytes.map(value -> value.length).orElse(0);
}

@Override
public MediaType contentType() {
return MediaType.parse(contentType);
}

@Override
public void writeTo(@NotNull BufferedSink sink) throws IOException {
if (!bytes.isPresent()) {
return;
}

try (var os = sink.outputStream()) {
os.write(bytes.get());
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class StreamingRequestBodyTest {
class ChunkedTransferRequestBodyTest {
private static final Faker FAKER = new Faker();

@Test
Expand All @@ -38,7 +38,7 @@ void verifyStreamingTransfer() throws IOException {

when(sink.outputStream()).thenReturn(outputStream);

var body = new StreamingRequestBody(() -> new ByteArrayInputStream(content.getBytes()), HttpDataAddress.OCTET_STREAM);
var body = new ChunkedTransferRequestBody(() -> new ByteArrayInputStream(content.getBytes()), HttpDataAddress.OCTET_STREAM);
body.writeTo(sink);

assertThat(outputStream).hasToString(content);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*
* Contributors:
* Amadeus - initial API and implementation
* Siemens - add chunked parameter
*
*/

Expand Down Expand Up @@ -139,6 +140,26 @@ void verifyAbstractMethodsInvokation() throws IOException {
assertThat(body.contentType()).isEqualTo(MediaType.get(supplier.contentType));
assertThat(HttpTestFixtures.formatRequestBodyAsString(body)).isEqualTo(supplier.body);
assertThat(httpRequest.method()).isEqualTo(supplier.method);
assertThat(httpRequest.body().contentLength()).isEqualTo(-1L);
}

@Test
void verifyChunkedCall() throws IOException {
var dataAddress = HttpDataAddress.Builder.newInstance()
.baseUrl("http://" + FAKER.internet().url())
.build();
var request = createRequest(dataAddress);

var supplier = new TestHttpRequestParamsSupplier(vaultMock, false);
var httpRequest = supplier.apply(request).toRequest();

assertThat(httpRequest.url().url()).hasToString(dataAddress.getBaseUrl() + "/" + supplier.path + "?" + supplier.queryParams);
var body = httpRequest.body();
assertThat(body).isNotNull();
assertThat(body.contentType()).isEqualTo(MediaType.get(supplier.contentType));
assertThat(HttpTestFixtures.formatRequestBodyAsString(body)).isEqualTo(supplier.body);
assertThat(httpRequest.method()).isEqualTo(supplier.method);
assertThat(httpRequest.body().contentLength()).isEqualTo(supplier.body.getBytes().length);
}

private static DataFlowRequest createRequest(DataAddress source) {
Expand All @@ -156,16 +177,33 @@ public static final class TestHttpRequestParamsSupplier extends HttpRequestParam
private final String queryParams;
private final String contentType;
private final String body;
private final Boolean chunked;
lucian-torje-siemens marked this conversation as resolved.
Show resolved Hide resolved

private TestHttpRequestParamsSupplier(Vault vault) {
super(vault);
this.method = new Random().nextBoolean() ? HttpMethod.PUT.name() : HttpMethod.POST.name();
this.chunked = new Random().nextBoolean() ? true : null;
this.path = FAKER.lorem().word();
this.queryParams = FAKER.lorem().word();
this.contentType = new Random().nextBoolean() ? APPLICATION_JSON : APPLICATION_X_WWW_FORM_URLENCODED;
this.body = FAKER.lorem().word();
}

private TestHttpRequestParamsSupplier(Vault vault, boolean chunked) {
super(vault);
this.method = new Random().nextBoolean() ? HttpMethod.PUT.name() : HttpMethod.POST.name();
this.chunked = chunked;
this.path = FAKER.lorem().word();
this.queryParams = FAKER.lorem().word();
this.contentType = new Random().nextBoolean() ? APPLICATION_JSON : APPLICATION_X_WWW_FORM_URLENCODED;
this.body = FAKER.lorem().word();
}

@Override
protected Boolean extractTransferChunked(HttpDataAddress address) {
return chunked;
}

@Override
protected @NotNull DataAddress selectAddress(DataFlowRequest request) {
return request.getSourceDataAddress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Random;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -106,4 +108,16 @@ void extractContentType() {
void extractBody() {
assertThat(supplier.extractPath(mock(HttpDataAddress.class), null)).isNull();
}

@Test
void extractTransferChunked() {
var chunked = new Random().nextBoolean();
var address = HttpDataAddress.Builder.newInstance()
.transferChunked(String.valueOf(chunked))
.build();

var result = supplier.extractTransferChunked(address);

assertThat(result).isEqualTo(chunked);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.junit.jupiter.api.Test;

import java.util.Map;
import java.util.Random;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
Expand Down Expand Up @@ -230,6 +231,18 @@ void extractBodyFilteredByProxy() {
assertThat(result).isNull();
}

@Test
void extractTransferChunked() {
var chunked = new Random().nextBoolean();
var address = HttpDataAddress.Builder.newInstance()
.transferChunked(String.valueOf(chunked))
.build();

var result = supplier.extractTransferChunked(address);

assertThat(result).isEqualTo(null);
}

private static DataFlowRequest createRequest(DataAddress source) {
return createRequest(source, Map.of());
}
Expand Down
Loading