Skip to content

Commit

Permalink
refactor: get rid of NoopTransferProcessClient
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt committed Sep 12, 2023
1 parent 656e10e commit bd645b0
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

package org.eclipse.edc.connector.dataplane.framework;

import org.eclipse.edc.connector.api.client.spi.transferprocess.NoopTransferProcessClient;
import org.eclipse.edc.connector.api.client.spi.transferprocess.TransferProcessApiClient;
import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceSelectionStrategy;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
Expand All @@ -31,12 +29,6 @@ public String name() {
return NAME;
}


@Provider(isDefault = true)
public TransferProcessApiClient transferProcessApiClient() {
return new NoopTransferProcessClient();
}

@Provider(isDefault = true)
public TransferServiceSelectionStrategy transferServiceSelectionStrategy() {
return TransferServiceSelectionStrategy.selectFirst();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,76 +14,30 @@

package org.eclipse.edc.connector.dataplane.framework;

import org.eclipse.edc.connector.api.client.spi.transferprocess.NoopTransferProcessClient;
import org.eclipse.edc.connector.api.client.spi.transferprocess.TransferProcessApiClient;
import org.eclipse.edc.connector.dataplane.framework.e2e.EndToEndTest;
import org.eclipse.edc.connector.dataplane.framework.pipeline.PipelineServiceImpl;
import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceSelectionStrategy;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.connector.dataplane.spi.pipeline.TransferService;
import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry;
import org.eclipse.edc.junit.extensions.DependencyInjectionExtension;
import org.eclipse.edc.spi.system.ExecutorInstrumentation;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.system.injection.ObjectFactory;
import org.eclipse.edc.spi.telemetry.Telemetry;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(DependencyInjectionExtension.class)
class DataPlaneFrameworkExtensionTest {

TransferService transferService1 = mock(TransferService.class);
TransferService transferService2 = mock(TransferService.class);
DataFlowRequest request = EndToEndTest.createRequest("1").build();

@BeforeEach
public void setUp(ServiceExtensionContext context) {
when(transferService1.canHandle(request)).thenReturn(true);
when(transferService2.canHandle(request)).thenReturn(true);
context.registerService(Telemetry.class, mock(Telemetry.class));
context.registerService(ExecutorInstrumentation.class, ExecutorInstrumentation.noop());
context.registerService(TransferProcessApiClient.class, new NoopTransferProcessClient());
}

@Test
void initialize_registers_PipelineService(ServiceExtensionContext context, ObjectFactory factory) {
var extension = factory.constructInstance(DataPlaneFrameworkExtension.class);
void initialize_registers_PipelineService(ServiceExtensionContext context, DataPlaneFrameworkExtension extension) {
extension.initialize(context);
assertThat(context.getService(PipelineService.class)).isInstanceOf(PipelineServiceImpl.class);
}

@Test
void initialize_registers_DataPlaneManager_withInjectedStrategy(ServiceExtensionContext context, ObjectFactory factory) {
// Arrange
// Inject a custom TransferServiceSelectionStrategy that will select the second service
context.registerService(TransferServiceSelectionStrategy.class,
(request, services) -> services.skip(1).findFirst().orElse(null));

// Act
validateRequest(context, factory);

// Assert
verify(transferService2).validate(request);
verify(transferService1, never()).validate(request);
assertThat(context.getService(PipelineService.class)).isInstanceOf(PipelineServiceImpl.class);
}

private void validateRequest(ServiceExtensionContext context, ObjectFactory factory) {
var extension = factory.constructInstance(DataPlaneFrameworkExtension.class);
extension.initialize(context);
var service = context.getService(TransferServiceRegistry.class);
service.registerTransferService(transferService1);
service.registerTransferService(transferService2);
var m = context.getService(DataPlaneManager.class);
m.validate(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package org.eclipse.edc.connector.dataplane.framework.e2e;

import org.eclipse.edc.connector.api.client.spi.transferprocess.NoopTransferProcessClient;
import org.eclipse.edc.connector.dataplane.framework.manager.DataPlaneManagerImpl;
import org.eclipse.edc.connector.dataplane.framework.pipeline.PipelineServiceImpl;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSink;
Expand Down Expand Up @@ -58,7 +57,7 @@ void testEndToEnd() throws Exception {
.monitor(monitor)
.pipelineService(pipelineService)
.executorInstrumentation(ExecutorInstrumentation.noop())
.transferProcessClient(new NoopTransferProcessClient())
.transferProcessClient(mock())
.build();
manager.start();
manager.transfer(new InputStreamDataSource("test", new ByteArrayInputStream("bytes".getBytes())), createRequest("1").build()).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@

package org.eclipse.edc.connector.dataplane.framework.manager;

import org.eclipse.edc.connector.api.client.spi.transferprocess.NoopTransferProcessClient;
import org.eclipse.edc.connector.dataplane.framework.store.InMemoryDataPlaneStore;
import org.eclipse.edc.connector.dataplane.spi.pipeline.TransferService;
import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry;
import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.system.ExecutorInstrumentation;
import org.eclipse.edc.spi.types.domain.DataAddress;
Expand All @@ -39,10 +37,10 @@


class DataPlaneManagerImplTest {
TransferService transferService = mock(TransferService.class);
TransferService transferService = mock();
DataPlaneStore store = new InMemoryDataPlaneStore(10);
DataFlowRequest request = createRequest();
TransferServiceRegistry registry = mock(TransferServiceRegistry.class);
TransferServiceRegistry registry = mock();

@BeforeEach
public void setUp() {
Expand All @@ -54,7 +52,7 @@ public void setUp() {
* Verifies a request is enqueued, dequeued, and dispatched to the pipeline service.
*/
@Test
void verifyWorkDispatch() throws InterruptedException {
void verifyWorkDispatch() {
var dataPlaneManager = createDataPlaneManager();

when(registry.resolveTransferService(request))
Expand All @@ -63,9 +61,8 @@ void verifyWorkDispatch() throws InterruptedException {
.thenReturn(true);


when(transferService.transfer(isA(DataFlowRequest.class))).thenAnswer(i -> {
return completedFuture(Result.success("ok"));
});
when(transferService.transfer(isA(DataFlowRequest.class)))
.thenAnswer(i -> completedFuture(Result.success("ok")));

dataPlaneManager.start();
dataPlaneManager.initiateTransfer(request);
Expand All @@ -80,7 +77,7 @@ void verifyWorkDispatch() throws InterruptedException {
* Verifies that the dispatch thread survives an error thrown by a worker.
*/
@Test
void verifyWorkDispatchError() throws InterruptedException {
void verifyWorkDispatchError() {
var dataPlaneManager = createDataPlaneManager();

when(transferService.canHandle(request))
Expand All @@ -89,9 +86,8 @@ void verifyWorkDispatchError() throws InterruptedException {
when(transferService.transfer(request))
.thenAnswer(i -> {
throw new RuntimeException("Test exception");
}).thenAnswer((i -> {
return completedFuture(Result.success("ok"));
}));
})
.thenAnswer((i -> completedFuture(Result.success("ok"))));


dataPlaneManager.start();
Expand All @@ -105,7 +101,7 @@ void verifyWorkDispatchError() throws InterruptedException {
}

@Test
void verifyWorkDispatch_onUnavailableTransferService_completesTransfer() throws InterruptedException {
void verifyWorkDispatch_onUnavailableTransferService_completesTransfer() {
// Modify store used in createDataPlaneManager()
store = mock(DataPlaneStore.class);

Expand Down Expand Up @@ -139,8 +135,8 @@ private DataPlaneManagerImpl createDataPlaneManager() {
.waitTimeout(10)
.transferServiceRegistry(registry)
.store(store)
.transferProcessClient(new NoopTransferProcessClient())
.monitor(mock(Monitor.class))
.transferProcessClient(mock())
.monitor(mock())
.build();
}

Expand Down
1 change: 1 addition & 0 deletions extensions/data-plane/data-plane-http/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies {

testImplementation(project(":core:common:junit"))
testImplementation(project(":core:data-plane:data-plane-core"))
testImplementation(project(":extensions:control-plane:api:control-plane-api-client"))
testImplementation(libs.restAssured)
testImplementation(libs.mockserver.netty)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ dependencies {
testImplementation(libs.mockserver.netty)
testImplementation(libs.mockserver.client)

testImplementation(project(":spi:data-plane:data-plane-spi"))
testImplementation(project(":core:common:junit"))
testImplementation(project(":extensions:control-plane:api:control-plane-api-client"))
testImplementation(testFixtures(project(":extensions:data-plane:data-plane-http")))
testImplementation(project(":spi:data-plane:data-plane-spi"))

testRuntimeOnly(project(":launchers:data-plane-server"))
}

This file was deleted.

0 comments on commit bd645b0

Please sign in to comment.