Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: avoid fetching body in memory during data plane proxy call #3827

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@

package org.eclipse.edc.connector.dataplane.framework;

import org.eclipse.edc.connector.dataplane.framework.pipeline.PipelineServiceImpl;
import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceSelectionStrategy;
import org.eclipse.edc.connector.dataplane.framework.store.InMemoryDataPlaneStore;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;

import java.time.Clock;

Expand All @@ -46,4 +49,9 @@ public TransferServiceSelectionStrategy transferServiceSelectionStrategy() {
public DataPlaneStore dataPlaneStore() {
return new InMemoryDataPlaneStore(clock);
}

@Provider(isDefault = true)
public PipelineService pipelineService(ServiceExtensionContext context) {
return new PipelineServiceImpl(context.getMonitor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@

import org.eclipse.edc.connector.api.client.spi.transferprocess.TransferProcessApiClient;
import org.eclipse.edc.connector.dataplane.framework.manager.DataPlaneManagerImpl;
import org.eclipse.edc.connector.dataplane.framework.pipeline.PipelineServiceImpl;
import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceRegistryImpl;
import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceSelectionStrategy;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataTransferExecutorServiceContainer;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry;
import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore;
import org.eclipse.edc.connector.dataplane.util.sink.OutputStreamDataSinkFactory;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provides;
Expand All @@ -48,7 +46,7 @@
/**
* Provides core services for the Data Plane Framework.
*/
@Provides({ DataPlaneManager.class, PipelineService.class, DataTransferExecutorServiceContainer.class, TransferServiceRegistry.class })
@Provides({ DataPlaneManager.class, DataTransferExecutorServiceContainer.class, TransferServiceRegistry.class })
@Extension(value = DataPlaneFrameworkExtension.NAME)
public class DataPlaneFrameworkExtension implements ServiceExtension {
public static final String NAME = "Data Plane Framework";
Expand Down Expand Up @@ -88,6 +86,9 @@ public class DataPlaneFrameworkExtension implements ServiceExtension {
@Inject
private Clock clock;

@Inject
private PipelineService pipelineService;

@Override
public String name() {
return NAME;
Expand All @@ -103,10 +104,6 @@ public void initialize(ServiceExtensionContext context) {
executorInstrumentation.instrument(executorService, "Data plane transfers"));
context.registerService(DataTransferExecutorServiceContainer.class, executorContainer);

var pipelineService = new PipelineServiceImpl(monitor);
pipelineService.registerFactory(new OutputStreamDataSinkFactory(monitor, executorContainer.getExecutorService())); // Added by default to support synchronous data transfer, i.e. pull data
context.registerService(PipelineService.class, pipelineService);

var transferServiceRegistry = new TransferServiceRegistryImpl(transferServiceSelectionStrategy);
transferServiceRegistry.registerTransferService(pipelineService);
context.registerService(TransferServiceRegistry.class, transferServiceRegistry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

package org.eclipse.edc.connector.dataplane.framework;

import org.eclipse.edc.connector.dataplane.framework.pipeline.PipelineServiceImpl;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceRegistryImpl;
import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry;
import org.eclipse.edc.junit.extensions.DependencyInjectionExtension;
import org.eclipse.edc.spi.system.ExecutorInstrumentation;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
Expand All @@ -34,10 +34,10 @@ public void setUp(ServiceExtensionContext context) {
}

@Test
void initialize_registers_pipelineService(ServiceExtensionContext context, DataPlaneFrameworkExtension extension) {
void initialize_registers_transferService(ServiceExtensionContext context, DataPlaneFrameworkExtension extension) {
extension.initialize(context);

assertThat(context.getService(PipelineService.class)).isInstanceOf(PipelineServiceImpl.class);
assertThat(context.getService(TransferServiceRegistry.class)).isInstanceOf(TransferServiceRegistryImpl.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSourceFactory;
import org.eclipse.edc.connector.dataplane.spi.pipeline.InputStreamDataSource;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult;
import org.eclipse.edc.connector.dataplane.util.sink.OutputStreamDataSink;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.DataAddress;
Expand All @@ -33,10 +32,8 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static java.util.UUID.randomUUID;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -49,24 +46,22 @@
public class PipelineServiceIntegrationTest {

private final Monitor monitor = mock();
private final PipelineServiceImpl pipelineService = new PipelineServiceImpl(monitor);

@Test
void transferData() {
var pipelineService = new PipelineServiceImpl(monitor);
var endpoint = new FixedEndpoint(monitor);
pipelineService.registerFactory(endpoint);
pipelineService.registerFactory(new FixedEndpoint());
pipelineService.registerFactory(new InputStreamDataFactory());

var future = pipelineService.transfer(createRequest().build());

assertThat(future).succeedsWithin(5, TimeUnit.SECONDS).satisfies(result -> {
assertThat(result).isSucceeded().isEqualTo("bytes");
assertThat(result).isSucceeded().isEqualTo("bytes".getBytes());
});
}

@Test
void transferData_withCustomSink() {
var pipelineService = new PipelineServiceImpl(monitor);
var text = "test-data-input-transferred-to-a-memory-stream";
pipelineService.registerFactory(new InputStreamDataFactory(text));

Expand All @@ -88,10 +83,10 @@ private DataFlowRequest.Builder createRequest() {
}

private static class FixedEndpoint implements DataSinkFactory {
private final OutputStreamDataSink sink;
private final DataSink sink;

FixedEndpoint(Monitor monitor) {
sink = new OutputStreamDataSink(randomUUID().toString(), Executors.newFixedThreadPool(1), monitor);
FixedEndpoint() {
sink = new MemorySink();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.result.AbstractResult;
import org.jetbrains.annotations.NotNull;
Expand All @@ -29,7 +28,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Stream;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.supplyAsync;
Expand All @@ -45,29 +43,14 @@
*/
public class AsyncStreamingDataSink implements DataSink {

/**
* Serves as a facade for a response context that writes data asynchronously to a client.
*/
@FunctionalInterface
public interface AsyncResponseContext {

/**
* Registers a callback when an output stream is available for writing data.
*
* @param consumer the callback
* @return true if the callback was successfully registered
*/
boolean register(Consumer<OutputStream> consumer);
}
public static final String TYPE = "AsyncStreaming";

private final AsyncResponseContext asyncContext;
private final ExecutorService executorService;
private final Monitor monitor;

public AsyncStreamingDataSink(AsyncResponseContext asyncContext, ExecutorService executorService, Monitor monitor) {
public AsyncStreamingDataSink(AsyncResponseContext asyncContext, ExecutorService executorService) {
this.asyncContext = asyncContext;
this.executorService = executorService;
this.monitor = monitor;
}

@Override
Expand All @@ -76,16 +59,17 @@ public CompletableFuture<StreamResult<Object>> transfer(DataSource source) {
if (streamResult.failed()) {
return completedFuture(failure(streamResult.getFailure()));
}
var partStream = streamResult.getContent();
return partStream
.map(part -> supplyAsync(() -> transferPart(part), executorService))
.collect(asyncAllOf())
.thenApply(r -> processResults(r, partStream));

try (var partStream = streamResult.getContent()) {
return partStream
.map(part -> supplyAsync(() -> transferPart(part), executorService))
.collect(asyncAllOf())
.thenApply(this::processResults);
}
}

@NotNull
private StreamResult<Object> processResults(List<? extends StatusResult<?>> results, Stream<DataSource.Part> partStream) {
close(partStream);
private StreamResult<Object> processResults(List<? extends StatusResult<?>> results) {
if (results.stream().anyMatch(AbstractResult::failed)) {
return error("Error transferring data");
}
Expand All @@ -94,22 +78,34 @@ private StreamResult<Object> processResults(List<? extends StatusResult<?>> resu

@NotNull
private StatusResult<?> transferPart(DataSource.Part part) {
var result = asyncContext.register(outputStream -> {
var result = asyncContext.register(new AsyncResponseCallback((outputStream) -> {
try {
part.openStream().transferTo(outputStream);
} catch (IOException e) {
throw new EdcException(e);
}
});
}, part.mediaType()));

return result ? StatusResult.success() : failure(FATAL_ERROR, "Could not resume output stream write");
}

private void close(AutoCloseable closeable) {
try {
closeable.close();
} catch (Exception e) {
monitor.warning("Error closing stream", e);
}
/**
* Serves as a facade for a response context that writes data asynchronously to a client.
*/
@FunctionalInterface
public interface AsyncResponseContext {

/**
* Registers a callback when an output stream is available for writing data. The second parameter is the media type.
*
* @param callback the callback
* @return true if the callback was successfully registered
*/
boolean register(AsyncResponseCallback callback);
}

public record AsyncResponseCallback(Consumer<OutputStream> outputStreamConsumer, String mediaType) {

}

}

This file was deleted.