Skip to content

Commit

Permalink
Merge pull request #65 from carlos-schmidt/extract-client-extension
Browse files Browse the repository at this point in the history
Extract client extension
  • Loading branch information
carlos-schmidt committed Dec 7, 2023
2 parents 5ca2e83 + f9e67e9 commit de86db6
Show file tree
Hide file tree
Showing 54 changed files with 1,708 additions and 1,256 deletions.
145 changes: 83 additions & 62 deletions README.md

Large diffs are not rendered by default.

45 changes: 45 additions & 0 deletions client/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
plugins {
`java-library`
jacoco
}

val javaVersion: String by project
val edcVersion: String by project
val rsApi: String by project
val mockitoVersion: String by project
val mockserverVersion: String by project

java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(javaVersion))
}
}

dependencies {
// See this project's README.MD for explanations
implementation("$group:contract-core:$edcVersion")
implementation("$group:dsp-catalog-http-dispatcher:$edcVersion")
implementation("$group:management-api:$edcVersion")
implementation("$group:runtime-metamodel:$edcVersion")
implementation("$group:data-plane-http-spi:$edcVersion") // HttpDataAddress

implementation("jakarta.ws.rs:jakarta.ws.rs-api:${rsApi}")

testImplementation("$group:junit:$edcVersion")
testImplementation("org.glassfish.jersey.core:jersey-common:3.1.3")
testImplementation("org.mockito:mockito-core:${mockitoVersion}")
testImplementation("org.mock-server:mockserver-junit-jupiter:${mockserverVersion}")
testImplementation("org.mock-server:mockserver-netty:${mockserverVersion}")
}

repositories {
mavenCentral()
}

tasks.test {
useJUnitPlatform()
}

tasks.jacocoTestReport {
dependsOn(tasks.test) // tests are required to run before generating the report
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2021 Fraunhofer IOSB, eine rechtlich nicht selbstaendige
* Einrichtung der Fraunhofer-Gesellschaft zur Foerderung der angewandten
* Forschung e.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.fraunhofer.iosb.client;

import org.eclipse.edc.api.auth.spi.AuthenticationService;
import org.eclipse.edc.connector.contract.spi.negotiation.ConsumerContractNegotiationManager;
import org.eclipse.edc.connector.contract.spi.negotiation.observe.ContractNegotiationObservable;
import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore;
import org.eclipse.edc.connector.spi.catalog.CatalogService;
import org.eclipse.edc.connector.transfer.spi.TransferProcessManager;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
import org.eclipse.edc.web.spi.WebService;

import de.fraunhofer.iosb.client.dataTransfer.DataTransferController;
import de.fraunhofer.iosb.client.negotiation.NegotiationController;
import de.fraunhofer.iosb.client.policy.PolicyController;

public class ClientExtension implements ServiceExtension {

@Inject
private AuthenticationService authenticationService;
@Inject
private CatalogService catalogService;
@Inject
private ConsumerContractNegotiationManager consumerNegotiationManager;
@Inject
private ContractNegotiationObservable contractNegotiationObservable;
@Inject
private ContractNegotiationStore contractNegotiationStore;
@Inject
private TransferProcessManager transferProcessManager;
@Inject
private TypeTransformerRegistry transformer;
@Inject
private WebService webService;

@Override
public void initialize(ServiceExtensionContext context) {
var monitor = context.getMonitor();
var config = context.getConfig();

var policyController = new PolicyController(monitor, catalogService, transformer, config);

var negotiationController = new NegotiationController(consumerNegotiationManager,
contractNegotiationObservable, contractNegotiationStore, config);

var dataTransferController = new DataTransferController(monitor, config, webService,
authenticationService, transferProcessManager);

webService.registerResource(new ClientEndpoint(monitor, negotiationController, policyController,
dataTransferController));

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright (c) 2021 Fraunhofer IOSB, eine rechtlich nicht selbstaendige
* Einrichtung der Fraunhofer-Gesellschaft zur Foerderung der angewandten
* Forschung e.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.fraunhofer.iosb.client.authentication;

import de.fraunhofer.iosb.client.ClientEndpoint;
import de.fraunhofer.iosb.client.dataTransfer.DataTransferEndpoint;
import jakarta.ws.rs.container.ContainerRequestContext;
import org.eclipse.edc.api.auth.spi.AuthenticationRequestFilter;
import org.eclipse.edc.api.auth.spi.AuthenticationService;
import org.eclipse.edc.spi.monitor.Monitor;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

import static java.lang.String.format;

/**
* Custom AuthenticationRequestFilter filtering requests that go directly to an
* AAS service (managed by this extension) or the extension's configuration.
*/
public class CustomAuthenticationRequestFilter extends AuthenticationRequestFilter {

private final Monitor monitor;
private final Map<String, String> tempKeys;

public CustomAuthenticationRequestFilter(Monitor monitor, AuthenticationService authenticationService) {
super(authenticationService);
this.monitor = monitor;
tempKeys = new ConcurrentHashMap<>();
}

/**
* Add key,value pair for a request. This key will only be available for one
* request.
*
* @param key The key name
* @param value The actual key
*/
public void addTemporaryApiKey(String key, String value) {
tempKeys.put(key, value);
}

/**
* On automated data transfer: If the request is valid, the key,value pair used
* for this request will no longer be valid.
*/
@Override
public void filter(ContainerRequestContext requestContext) {
Objects.requireNonNull(requestContext);
var requestPath = requestContext.getUriInfo().getPath();

for (String key : tempKeys.keySet()) {
if (requestContext.getHeaders().containsKey(key)
&& requestContext.getHeaderString(key).equals(tempKeys.get(key))
&& requestPath.startsWith(
format("%s/%s", ClientEndpoint.AUTOMATED_PATH, DataTransferEndpoint.RECEIVE_DATA_PATH))) {
monitor.debug(
format("[Client] Data Transfer request with custom api key %s", key));
tempKeys.remove(key);
return;
}
}

super.filter(requestContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright (c) 2021 Fraunhofer IOSB, eine rechtlich nicht selbstaendige
* Einrichtung der Fraunhofer-Gesellschaft zur Foerderung der angewandten
* Forschung e.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.fraunhofer.iosb.client.dataTransfer;

import static java.lang.String.format;

import java.net.URL;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.eclipse.edc.api.auth.spi.AuthenticationService;
import org.eclipse.edc.connector.dataplane.http.spi.HttpDataAddress;
import org.eclipse.edc.connector.transfer.spi.TransferProcessManager;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.configuration.Config;
import org.eclipse.edc.web.spi.WebService;

import de.fraunhofer.iosb.client.authentication.CustomAuthenticationRequestFilter;

public class DataTransferController {

static final String DATA_TRANSFER_API_KEY = "data-transfer-api-key";

private static final int WAIT_FOR_TRANSFER_TIMEOUT_DEFAULT = 10;

private final Config config;

private final DataTransferEndpoint dataTransferEndpoint;
private final DataTransferObservable dataTransferObservable;
private final TransferInitiator transferInitiator;

private final CustomAuthenticationRequestFilter dataEndpointAuthenticationRequestFilter;

/**
* Class constructor
*
* @param monitor Logging.
* @param config Read config value transfer timeout and
* own URI
* @param webService Register data transfer endpoint.
* @param dataEndpointAuthRequestFilter Creating and passing through custom api
* keys for each data transfer.
* @param transferProcessManager Initiating a transfer process as a
* consumer.
*/
public DataTransferController(Monitor monitor, Config config, WebService webService,
AuthenticationService authenticationService, TransferProcessManager transferProcessManager) {
this.config = config;
this.transferInitiator = new TransferInitiator(config, monitor, transferProcessManager);
this.dataEndpointAuthenticationRequestFilter = new CustomAuthenticationRequestFilter(monitor,
authenticationService);

this.dataTransferObservable = new DataTransferObservable(monitor);
this.dataTransferEndpoint = new DataTransferEndpoint(monitor, dataTransferObservable);
webService.registerResource(dataTransferEndpoint);
}

/**
* Initiates the transfer process defined by the arguments. The data of the
* transfer will be sent to {@link DataTransferEndpoint#RECEIVE_DATA_PATH}.
*
* @param providerUrl The provider from whom the data is to be fetched.
* @param agreementId Non-null ContractAgreement of the negotiation process.
* @param assetId The asset to be fetched.
* @param dataSinkAddress HTTPDataAddress the result of the transfer should be
* sent to. (If null, send to extension and print in log)
*
* @return A completable future whose result will be the data or an error
* message.
* @throws InterruptedException If the data transfer was interrupted
* @throws ExecutionException If the data transfer process failed
*/
public String initiateTransferProcess(URL providerUrl, String agreementId, String assetId,
URL dataDestinationUrl) throws InterruptedException, ExecutionException {
// Prepare for incoming data
var dataFuture = new CompletableFuture<String>();
dataTransferObservable.register(dataFuture, agreementId);

if (Objects.isNull(dataDestinationUrl)) {
var apiKey = UUID.randomUUID().toString();
dataEndpointAuthenticationRequestFilter.addTemporaryApiKey(DATA_TRANSFER_API_KEY, apiKey);

this.transferInitiator.initiateTransferProcess(providerUrl, agreementId, assetId, apiKey);
return waitForData(dataFuture, agreementId);
} else {
var dataSinkAddress = HttpDataAddress.Builder.newInstance()
.baseUrl(dataDestinationUrl.toString())
.build();

this.transferInitiator.initiateTransferProcess(providerUrl, agreementId, assetId, dataSinkAddress);
return null;
}

}

private String waitForData(CompletableFuture<String> dataFuture, String agreementId)
throws InterruptedException, ExecutionException {
var waitForTransferTimeout = config.getInteger("getWaitForTransferTimeout",
WAIT_FOR_TRANSFER_TIMEOUT_DEFAULT);
try {
// Fetch TransferTimeout everytime to adapt to runtime config changes
var data = dataFuture.get(waitForTransferTimeout, TimeUnit.SECONDS);
dataTransferObservable.unregister(agreementId);
return data;
} catch (TimeoutException transferTimeoutExceededException) {
dataTransferObservable.unregister(agreementId);
throw new EdcException(format("Waiting for an transfer failed for agreementId: %s", agreementId),
transferTimeoutExceededException);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.fraunhofer.iosb.app.client.dataTransfer;
package de.fraunhofer.iosb.client.dataTransfer;

import de.fraunhofer.iosb.app.Logger;
import de.fraunhofer.iosb.app.client.ClientEndpoint;
import de.fraunhofer.iosb.client.ClientEndpoint;
import jakarta.ws.rs.*;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;

import java.util.Objects;

import org.eclipse.edc.spi.monitor.Monitor;

import static java.lang.String.format;

/**
* Endpoint for automated data transfer
*/
@Consumes({MediaType.APPLICATION_JSON, MediaType.WILDCARD})
@Produces({MediaType.APPLICATION_JSON})
@Consumes({ MediaType.APPLICATION_JSON, MediaType.WILDCARD })
@Produces({ MediaType.APPLICATION_JSON })
@Path(ClientEndpoint.AUTOMATED_PATH)
public class DataTransferEndpoint {

Expand All @@ -38,10 +39,11 @@ public class DataTransferEndpoint {
*/
public static final String RECEIVE_DATA_PATH = "receiveData";

private static final Logger LOGGER = Logger.getInstance();
private final Monitor monitor;
private final DataTransferObservable observable;

public DataTransferEndpoint(DataTransferObservable observable) {
public DataTransferEndpoint(Monitor monitor, DataTransferObservable observable) {
this.monitor = monitor;
this.observable = observable;
}

Expand All @@ -56,7 +58,7 @@ public DataTransferEndpoint(DataTransferObservable observable) {
@POST
@Path("receiveData/{agreement}")
public Response receiveData(@PathParam("agreement") String agreementId, String requestBody) {
LOGGER.log(format("Receiving data for agreement %s...", agreementId));
monitor.info(format("[Client] Receiving data for agreement %s...", agreementId));
Objects.requireNonNull(agreementId);
Objects.requireNonNull(requestBody);
observable.update(agreementId, requestBody);
Expand Down

0 comments on commit de86db6

Please sign in to comment.