Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data Plane Framework #463

Closed
Tracked by #1302
jimmarino opened this issue Dec 21, 2021 · 8 comments · Fixed by #763
Closed
Tracked by #1302

Data Plane Framework #463

jimmarino opened this issue Dec 21, 2021 · 8 comments · Fixed by #763
Assignees
Labels
dpf Feature related to the Data Plane Framework
Projects
Milestone

Comments

@jimmarino
Copy link
Contributor

jimmarino commented Dec 21, 2021

Overview

This issue will detail the architecture and design of the Data Plane Framework (DPF), which will provide a mechanism for implementing an extensible DataFlowController that supports a myriad of deployment topologies.

Scope

The Data Plane Framework is designed to only work with finite data transfers and small payload, latent non-finite transfers such as events. High-volume or latency-sensitive streaming (non-finite) data transfers should be handled by other DataFlowController implementations that delegate to specialized third-party infrastructure such as Kafka.

Principles

  1. Minimal state. All state pertaining to a transfer process will be maintained by the Connector Control Plane as part of the TransferProcess. The only state that will be maintained by the DPF is if a transfer process has been completed. This requires the Control Plan to issue retries in the event of failure.

  2. No transformations. The DPF is not an ETL tool. There will be no facilities for data transformations or processing. This is expected to handled by the Control Plane as part of the provisioning state.

  3. Do not reinvent the wheel. The DPF will rely on existing data transfer technology such as S3, Azure Object Storage, FTP, etc. As a rule of thumb: the DPF should not contain any wire protocol implementations.

  4. Flexible Deployment. It must be possible to:

    • Deploy the DPF to a K8S cluster
    • Deploy the DPF remotely from the Control Plane
    • Deploy the DPF in the same process as the Control Plane for demo and testing purposes
  5. Extensible. The DPF will be built using the EDC modularity and extensibility system.

Design

Data Plane Manager

The DataPlaneManager (DPM) will enqueue DataFlowRequests requests, which may originate from an ingress such as an HTTP endpoint. The queue implementation will be a bounded, in-memory data structure that provides support for backpressure when the system is overloaded. However, since the queue is in-memory, the client control plane submitting the request will be responsible for re-submitting a request in the event of failure:

The DataPlaneManager will look similar to the following:

/**
 * Manages the execution of data plane requests.
 */
public interface DataPlaneManager {

    /**
     * Initiates a transfer for the data flow request. This method is non-blocking with respect to processing the request.
     */
    void initiateTransfer(DataFlowRequest dataRequest);

    /**
     * Performs a data transfer using the supplied data source.
     */
    CompletableFuture<TransferResult> transfer(DataSource source, DataFlowRequest request);

    /**
     * Performs a data transfer using the supplied data sink.
     */
    CompletableFuture<TransferResult> transfer(DataSink sink, DataFlowRequest request);
}

The DPM implementation will have a configurable number of workers that dequeue requests for processing (data transfer). This processing operation will complete asynchronously and when a result is returned, an entry for the process indicating success or failure will be made using extensible storage. This will allow the control plane to query the DPF for the status (COMPLETED or ERROR) of a process. No other state will be maintained by the DPF.

The DPM will also contain transfer methods that take a DataSource and DataSink respectively, These can be used to provide an ad hoc transfer source or destination. For example, user code may de-queue a message to memory and send it to a destination using an InputStreamDataSource that wraps the in-memory message. Similarly, user code may wish to transfer data from a source to a provided OutputStreamDataSink.

Pipeline Service

When a DPM worker dequeues a request, it will delegate toPipelineService#transfer(request):

/**
 * Transfers data from a source to a sink.
 */
public interface PipelineService {

    /**
     * Transfers data associated with the request.
     */
    CompletableFuture<TransferResult> transfer(DataFlowRequest request);

    /**
     * Transfers data using the supplied data source.
     */
    CompletableFuture<TransferResult> transfer(DataSource source, DataFlowRequest request);

    /**
     * Transfers data using the supplied data sink.
     */
    CompletableFuture<TransferResult> transfer(DataSink sink, DataFlowRequest request);

    /**
     * Registers a factory for creating data sources.
     */
    void registerFactory(DataSourceFactory factory);

    /**
     * Registers a factory for creating data sinks.
     */
    void registerFactory(DataSinkFactory factory);
}

The implementation of this service is straightforward: it connects a sink with its source, both of which are instantiated by extensible factories. This design allows for scalable, n-way data transfers since all data will be directly streamed from its origin to the terminus.

Pull Streaming

The DPF design is based on a pull streaming model; a DataSink pulls data from a DataSource. This allows the sink to control the rate of transfer and potentially parallelize operation. Moreover, optimizations can be made such as copy-in-place when the source and sink are the same infrastructure such as object storage hosted by a single cloud provider.

The following is the provisional DataSource interface:

/**
 * Implements pull semantics for accessing a data source. A data source is composed or one or more named parts. Some implementations may support random access of the underlying
 * part content so that large content transfers can be parallelized.
 */
public interface DataSource {

    /**
     * Opens a stream to the source parts.
     */
    Stream<Part> openStream();

    /**
     * A data source part. This is typically an underlying file or container that the data contains.
     */
    interface Part extends AutoCloseable {

        /**
         * The part name.
         */
        String name();

        /**
         * The size of the part, or -1 if the size cannot be determined.
         */
        long size();

        /**
         * Opens stream to sequentially read the underlying part content.
         */
        InputStream openStream();

        /**
         * Returns true if the part supports random access of its contents. If random access is supported, {@link #read(long, long)} may be invoked.
         */
        default boolean supportsRandomAccess() {
            return false;
        }

        /**
         * Reads a segment of the underlying data. Implementations must throw {@link UnsupportedOperationException} if random access is not supported.
         */
        default byte[] read(long offset, long bytes) {
            throw new UnsupportedOperationException("Random access not supported");
        }
    }

}

Implementations may support random data access for parallel transfers of large files. For example, both AWS S3 and Azure Object Storage have facilities for ranged gets.

The DataSink interface is tentatively defined as:

/**
 * A data sink.
 */
public interface DataSink {

    /**
     * Transfers the data to the sink, returning a future to obtain the result. Implementations may be non-blocking.
     */
    CompletableFuture<Result<Void>> transfer(DataSource source);
}

DataSink implementations will provide the ability to configure parallel transfers, e.g. controlling the number of threads to perform ranged gets.

Extensions

HTTP

The HTTP Data Plane extension will provide support for sending data sourced from an HTTP endpoint and posting data to an HTTP endpoint. By nature of the DPF design, which supports n-way transfers, HTTP-sourced data can be sent to any DataSink type and an HTTP endpoint can receive data from any DataSource type. The extension is designed to
stream content to limit memory consumption under load.

Note that the large data transfers should use the Azure Object Storage or S3 extensions as those support more scalable parallelization.

Requirements

  1. End users will need to provide a source endpoint that serves data via HTTP GET. The sub-path of the endpoint will include the name of the artifact. An authentication token will be provided as an HTTP header.

  2. End users will need to provide a target endpoint that received data via HTTP POST. The sub-path of the endpoint will include the name of the artifact. An authentication token will be provided as an HTTP header.

Azure Object Storage

See #618.

S3 Compatible Object Storage

Coming soon.

@jimmarino jimmarino self-assigned this Dec 21, 2021
@mspiekermann mspiekermann added this to To do in Connector via automation Dec 22, 2021
@mspiekermann mspiekermann mentioned this issue Jan 7, 2022
15 tasks
@mspiekermann mspiekermann added this to the Milestone 2 milestone Jan 7, 2022
@MoritzKeppler
Copy link
Contributor

thx a lot for the proposal, that's great!

Some questions I have - partially beyond the scope of the DPF, just to sort things in the right way in my slowly-working head:

  • The Data Plane Framework is just one implementation of a DataFlowController. There can be more (e.g. for streaming), right?
  • It focusses on finite data. Does it also support the following cases:
    • a consumer wants to pull data multiple times on demand, e.g. from a REST API like the one defined by AAS. (is every call a separate transfer process?)
    • a provider agreed to push data as soon as an event occurs (e.g. an error event in a machine; can happen far away in future)?
  • who takes care for logging events (data sent, retrieved) including notifying a clearing house if needed?
  • who takes care for managing retrieved assets? In the data transfer diagrams we drew in summer, we discussed adding retrieved assets to some index and new assets with backpointers if an asset needs to be reshared.
  • Are workers thought to run in separate processes? Or how will it scale horizontally?
  • The DataPlaneFramework (or more generic: a DataFlowController) will just run on one side, either on Consumer side if it's a pull flow, or on provider side, if it's push. Or is it thought to run on both sides?
    Moritz Keppler moritz.keppler@daimler.com, Daimler TSS GmbH, legal info/Impressum

@jimmarino
Copy link
Contributor Author

Hi @MoritzKeppler just saw your questions now.

The Data Plane Framework is just one implementation of a DataFlowController. There can be more (e.g. for streaming), right?

In the scope, I initially wrote, "The Data Plane Framework is designed to only work with finite data transfers. Streaming (non-finite) data transfers should be handled by other DataFlowController implementations" However, your use case in the next point brings up some interesting scenarios.

a consumer wants to pull data multiple times on demand, e.g. from a REST API like the one defined by AAS. (is every call a separate transfer process?)

That's an interesting scenario. It seems a bit heavyweight and onerous to require a transfer process for these types of data exchange. Maybe that should be modeled as a non-finite process (i.e. a subscription)? The question would then be: should that type of data flow be handled by the data plane or a separate extension? My initial idea was only the Connector should initiate flows through the DPF, and those flows should be finite, but that could be changed. What I think we need to avoid is mediating non-finite data streams, specifically interposing the DPF between messaging endpoints (e.g. Kafka). Maybe the AAS transfer should be handled by its own extension rather than through the DPF? Are there specific ways you think the DPF could be relevant here?

a provider agreed to push data as soon as an event occurs (e.g. an error event in a machine; can happen far away in future)?

This seems like a subscription but one that happens intermittently. Again, maybe the DPF could be beneficial since it provides an n-way data flow model. We could open the DPF up receive requests from infrastructure other than the Connector in this case.

who takes care for logging events (data sent, retrieved) including notifying a clearing house if needed?

I think this should be done by the Connector or other controlling system. The DPF is just a set of pipes.

who takes care for managing retrieved assets? In the data transfer diagrams we drew in summer, we discussed adding retrieved assets to some index and new assets with backpointers if an asset needs to be reshared.

This is probably a much broader discussion. IMO the DPF should not manage this I also don't think the Connector should ever store retrieved assets or have long-term access to them for security reasons. Let's discuss this because there are a lot of scenarios to go over.

Are workers thought to run in separate processes? Or how will it scale horizontally?
Yep, just spin up as many workers as one needs and multiplex requests to them.

The DataPlaneFramework (or more generic: a DataFlowController) will just run on one side, either on Consumer side if it's a pull flow, or on provider side, if it's push. Or is it thought to run on both sides?

It theoretically only needs to run on one side, I think :-)

@jimmarino
Copy link
Contributor Author

Here's the related PR #496.

@jimmarino
Copy link
Contributor Author

jimmarino commented Jan 19, 2022

@MoritzKeppler @bscholtes1A

I made an update to the spec and pushed corresponding changes to the PR branch. I added the notion of ad hoc sinks and sources. This will allow us to send/receive messages and events directly through the DPF. For example, client code dequeues a message and wraps it in a InputStreamDataSource that is passed to the DPM and into the transfer pipeline. The data source is just a wrapper around the dequeued message. Similarly, client code can transfer from a source to an OutputStreamDataSink which can be a wrapper around an in-memory object or a queue message.

Operations are async but return a CompletableFuture so clients can choose to wait on the results. This isn't fast, but it is important to the design since external sinks and sources that are slow should not impact the overall operation and progress of the DPF instance. If a use case requires lower latency, we should recommend a direct streaming solution such as a Kafka extension.

Let me know what you guys think

@jimmarino jimmarino moved this from To do to In progress in Connector Jan 19, 2022
@MikhailGordienk
Copy link
Contributor

@jimmarino If it is possible to have DPF running on both consumer and provider sides, what is the process of deciding where to have it running? Is it supposed to be a dedicated property in DataAddress.TYPE? I can imagine HTTP requests be handled always on the consumer side whereas S3 requests - on the provider side.

For example, Here you mentioned that HttpFunctionDataFlowController would call to DPF on both sides, how would it decide what side to call?

@bscholtes1A
Copy link
Contributor

@MoritzKeppler @bscholtes1A

I made an update to the spec and pushed corresponding changes to the PR branch. I added the notion of ad hoc sinks and sources. This will allow us to send/receive messages and events directly through the DPF. For example, client code dequeues a message and wraps it in a InputStreamDataSource that is passed to the DPM and into the transfer pipeline. The data source is just a wrapper around the dequeued message. Similarly, client code can transfer from a source to an OutputStreamDataSink which can be a wrapper around an in-memory object or a queue message.

Operations are async but return a CompletableFuture so clients can choose to wait on the results. This isn't fast, but it is important to the design since external sinks and sources that are slow should not impact the overall operation and progress of the DPF instance. If a use case requires lower latency, we should recommend a direct streaming solution such as a Kafka extension.

Let me know what you guys think

Hi @jimmarino and thanks for the improvements. Two main points:

  • I still do not see in the code how the query parameters are conveyed to the HttpDataSource which is in charge of pulling the data out from the http endpoint. Reminder: in our use-case, the Asset represents the API source but you can (should) provide query parameters in order target only a subset of the data that are accessible through this API.
  • About the ad-hoc source/sink, I do not get it :D Based on the code of your PR, could you detail what is yet needed to implement in order to have the scenario we discussed? i.e. data provider reading data from a kafka topic and then sending them to the consumer through http.

@jimmarino
Copy link
Contributor Author

jimmarino commented Jan 21, 2022

Hi @MikhailGordienk The DPF can run on the producer or client side. Its job is to retrieve data from a source and send it to a target.

This can work with "push" or "pull" data transfers:

  1. In a push scenario (the provider sends data to the client), the DPF is on the provider side and retrieves the data from the provider's storage and sends it via the DPF to some endpoint the client has sent with the request. This endpoint could be an S3 bucket or HTTP endpoint. From the perspective of the provider's DPF, the infrastructure behind that endpoint isn't visible.

  2. In the pull scenario (the client retrieves data from the provider), the DPF is on the client side and retrieves the data using the address sent by the provider. The data is sent by the DPF to a storage location specified by the client. Again, what is behind the provider address isn't visible to the client DPF.

Does that clarify how it works?

@jimmarino
Copy link
Contributor Author

jimmarino commented Jan 21, 2022

Hi @jimmarino and thanks for the improvements. Two main points:

  • I still do not see in the code how the query parameters are conveyed to the HttpDataSource which is in charge of pulling the data out from the http endpoint. Reminder: in our use-case, the Asset represents the API source but you can (should) provide query parameters in order target only a subset of the data that are accessible through this API.

Let's discuss. I didn't put this in yet. One possibility is to put this in an extensible property or use the data address.

  • About the ad-hoc source/sink, I do not get it :D Based on the code of your PR, could you detail what is yet needed to implement in order to have the scenario we discussed? i.e. data provider reading data from a kafka topic and then sending them to the consumer through HTTP.

This can be done in a couple of ways. For the first case, let's assume the DPF is setup as a server with an HTTP(S) interface:

  1. The Kafka subscriber dequeues a message and POSTs to the DPF server.
  2. The DPF server has a controller that receives the message and sends it using a InputStreamDataSource:
var dataFlowRequest = // ...
var messageBytes = // ....
var source = new InputStreamDataSource("test", new ByteArrayInputStream(messageBytes))

// DataPlaneManager is injected into the extension
dataPlaneManager.transfer(source, dataFlowRequest).whenComplete(r-> ...);

The other case is when the DPF is embedded in the KafkaConsumer (or the DPF server subscribes directly to Kafka). In that case, the code is virtually identical save for the HTTP hop.

You can also use an OutputStreamDataSink to implement the reverse scenario to send data directly to a destination. For example, retrieve data from a DataSource and pipe it directly to the output stream sink which could be attached to a queue or some other system.

Does that make sense?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dpf Feature related to the Data Plane Framework
Projects
No open projects
Connector
  
Done
Development

Successfully merging a pull request may close this issue.

8 participants