Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: get rid of DspHttpDispatcherDelegate #3771

Merged
merged 1 commit into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,18 @@ public <T> CompletableFuture<T> executeAsync(Request request, Function<Response,
}

@Override
public <T> CompletableFuture<T> executeAsync(Request request, List<FallbackFactory> fallbacks, Function<Response, T> mappingFunction) {
public CompletableFuture<Response> executeAsync(Request request, List<FallbackFactory> fallbacks) {
var call = okHttpClient.newCall(request);
var builder = with(retryPolicy);
fallbacks.stream().map(it -> it.create(request)).forEach(builder::compose);

return builder.compose(call)
.executeAsync()
.executeAsync();
}

@Override
public <T> CompletableFuture<T> executeAsync(Request request, List<FallbackFactory> fallbacks, Function<Response, T> mappingFunction) {
return executeAsync(request, fallbacks)
.thenApply(response -> {
try (response) {
return mappingFunction.apply(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import dev.failsafe.RetryPolicy;
import okhttp3.Request;
import okhttp3.Response;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.http.EdcHttpClient;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.TypeManager;
import org.jetbrains.annotations.NotNull;
Expand All @@ -33,6 +33,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static java.util.Collections.emptyList;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.eclipse.edc.junit.testfixtures.TestUtils.getFreePort;
import static org.eclipse.edc.junit.testfixtures.TestUtils.testOkHttpClient;
Expand All @@ -56,8 +57,8 @@ class EdcHttpClientImplTest {
private ClientAndServer server;

@NotNull
private static EdcHttpClientImpl clientWith(RetryPolicy<Response> retryPolicy) {
return new EdcHttpClientImpl(testOkHttpClient(), retryPolicy, mock(Monitor.class));
private static EdcHttpClient clientWith(RetryPolicy<Response> retryPolicy) {
return new EdcHttpClientImpl(testOkHttpClient(), retryPolicy, mock());
}

@BeforeEach
Expand Down Expand Up @@ -202,7 +203,7 @@ void executeAsync_fallback_shouldRetryIfStatusIsNotSuccessful() {

server.when(request(), unlimited()).respond(new HttpResponse().withStatusCode(500));

var result = client.executeAsync(request, List.of(retryWhenStatusNot2xxOr4xx()), handleResponse());
var result = client.executeAsync(request, List.of(retryWhenStatusNot2xxOr4xx())).thenApply(handleResponse());

assertThat(result).failsWithin(5, TimeUnit.SECONDS);
server.verify(request(), exactly(2));
Expand All @@ -218,7 +219,7 @@ void executeAsync_fallback_shouldRetryIfStatusIs4xx() {

server.when(request(), unlimited()).respond(new HttpResponse().withStatusCode(500));

var result = client.executeAsync(request, List.of(retryWhenStatusNot2xxOr4xx()), handleResponse());
var result = client.executeAsync(request, List.of(retryWhenStatusNot2xxOr4xx())).thenApply(handleResponse());

assertThat(result).failsWithin(5, TimeUnit.SECONDS);
server.verify(request(), exactly(2));
Expand All @@ -234,7 +235,7 @@ void executeAsync_fallback_shouldNotRetryIfStatusIsExpected() {

server.when(request(), unlimited()).respond(new HttpResponse().withStatusCode(404));

var result = client.executeAsync(request, List.of(retryWhenStatusNot2xxOr4xx()), handleResponse());
var result = client.executeAsync(request, List.of(retryWhenStatusNot2xxOr4xx())).thenApply(handleResponse());

assertThat(result).succeedsWithin(5, TimeUnit.SECONDS);
server.verify(request(), exactly(1));
Expand All @@ -249,7 +250,7 @@ void executeAsync_fallback_shouldRetryIfStatusIsNotAsExpected() {
.build();
server.when(request(), unlimited()).respond(new HttpResponse().withStatusCode(200));

var result = client.executeAsync(request, List.of(retryWhenStatusIsNot(204)), handleResponse());
var result = client.executeAsync(request, List.of(retryWhenStatusIsNot(204))).thenApply(handleResponse());

assertThat(result).failsWithin(5, TimeUnit.SECONDS);
server.verify(request(), exactly(2));
Expand All @@ -264,7 +265,7 @@ void executeAsync_fallback_shouldFailAfterAttemptsExpired_whenServerIsDown() {
.url("http://localhost:" + port)
.build();

var result = client.executeAsync(request, handleResponse());
var result = client.executeAsync(request, emptyList()).thenApply(handleResponse());

assertThat(result).failsWithin(5, TimeUnit.SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

import org.eclipse.edc.catalog.spi.CatalogRequestMessage;
import org.eclipse.edc.catalog.spi.DatasetRequestMessage;
import org.eclipse.edc.protocol.dsp.catalog.dispatcher.delegate.CatalogRequestHttpRawDelegate;
import org.eclipse.edc.protocol.dsp.catalog.dispatcher.delegate.DatasetRequestHttpRawDelegate;
import org.eclipse.edc.protocol.dsp.catalog.dispatcher.delegate.ByteArrayBodyExtractor;
import org.eclipse.edc.protocol.dsp.dispatcher.GetDspHttpRequestFactory;
import org.eclipse.edc.protocol.dsp.dispatcher.PostDspHttpRequestFactory;
import org.eclipse.edc.protocol.dsp.spi.dispatcher.DspHttpRemoteMessageDispatcher;
Expand Down Expand Up @@ -52,15 +51,17 @@ public String name() {

@Override
public void initialize(ServiceExtensionContext context) {
var byteArrayBodyExtractor = new ByteArrayBodyExtractor();

messageDispatcher.registerMessage(
CatalogRequestMessage.class,
new PostDspHttpRequestFactory<>(remoteMessageSerializer, m -> BASE_PATH + CATALOG_REQUEST),
new CatalogRequestHttpRawDelegate()
byteArrayBodyExtractor
);
messageDispatcher.registerMessage(
DatasetRequestMessage.class,
new GetDspHttpRequestFactory<>(m -> BASE_PATH + DATASET_REQUEST + "/" + m.getDatasetId()),
new DatasetRequestHttpRawDelegate()
byteArrayBodyExtractor
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.protocol.dsp.catalog.dispatcher.delegate;

import okhttp3.ResponseBody;
import org.eclipse.edc.protocol.dsp.spi.dispatcher.response.DspHttpResponseBodyExtractor;
import org.eclipse.edc.spi.EdcException;

import java.io.IOException;

/**
* Extract the body as a byte[]
*/
public class ByteArrayBodyExtractor implements DspHttpResponseBodyExtractor<byte[]> {
@Override
public byte[] extractBody(ResponseBody responseBody) {
try {
if (responseBody == null) {
return null;
}
return responseBody.bytes();

} catch (IOException e) {
throw new EdcException("Failed to read response body", e);
}
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.protocol.dsp.catalog.dispatcher.delegate;

import okhttp3.ResponseBody;
import org.eclipse.edc.spi.EdcException;
import org.junit.jupiter.api.Test;

import java.io.IOException;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class ByteArrayBodyExtractorTest {

private final ByteArrayBodyExtractor extractor = new ByteArrayBodyExtractor();

@Test
void shouldReturnBodyAsBytes() throws IOException {
var responseBody = mock(ResponseBody.class);
var bytes = "test".getBytes();
when(responseBody.bytes()).thenReturn(bytes);

var result = extractor.extractBody(responseBody);

assertThat(result).isEqualTo(bytes);
}

@Test
void shouldReturnNull_whenBodyIsNull() {
var result = extractor.extractBody(null);

assertThat(result).isNull();
}

@Test
void shouldThrowException_whenCannotExtractBytes() throws IOException {
var responseBody = mock(ResponseBody.class);
when(responseBody.bytes()).thenThrow(new IOException());

assertThatThrownBy(() -> extractor.extractBody(responseBody)).isInstanceOf(EdcException.class);
}

}

This file was deleted.

Loading
Loading