Skip to content

Commit

Permalink
feat: timeout and protocol configurable in Participant (#3753)
Browse files Browse the repository at this point in the history
feat: timeout and protocol configurable in Participants + additional methods
  • Loading branch information
wolf4ood committed Jan 2, 2024
1 parent ecb5158 commit 70992d8
Showing 1 changed file with 133 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,23 +52,40 @@
*/
public class Participant {

private static final String DSP_PROTOCOL = "dataspace-protocol-http";
private static final Duration TIMEOUT = Duration.ofSeconds(30);

protected String id;
protected String name;
protected Endpoint managementEndpoint;
protected Endpoint protocolEndpoint;
protected JsonLd jsonLd;
protected ObjectMapper objectMapper;

protected Duration timeout = Duration.ofSeconds(30);

protected String protocol = "dataspace-protocol-http";

protected Participant() {
}

public String getName() {
return name;
}

public Duration getTimeout() {
return timeout;
}

public String getProtocol() {
return protocol;
}

public Endpoint getProtocolEndpoint() {
return protocolEndpoint;
}

public Endpoint getManagementEndpoint() {
return managementEndpoint;
}

/**
* Create a new {@link org.eclipse.edc.spi.types.domain.asset.Asset}.
*
Expand Down Expand Up @@ -163,19 +180,32 @@ public String createContractDefinition(String assetId, String definitionId, Stri
* @return list of {@link org.eclipse.edc.catalog.spi.Dataset}.
*/
public JsonArray getCatalogDatasets(Participant provider) {
return getCatalogDatasets(provider, null);
}

/**
* Request provider catalog.
*
* @param provider data provider
* @return list of {@link org.eclipse.edc.catalog.spi.Dataset}.
*/
public JsonArray getCatalogDatasets(Participant provider, JsonObject querySpec) {
var datasetReference = new AtomicReference<JsonArray>();
var requestBody = createObjectBuilder()
var requestBodyBuilder = createObjectBuilder()
.add(CONTEXT, createObjectBuilder().add(VOCAB, EDC_NAMESPACE))
.add(TYPE, "CatalogRequest")
.add("counterPartyAddress", provider.protocolEndpoint.url.toString())
.add("protocol", DSP_PROTOCOL)
.build();
.add("protocol", protocol);

await().atMost(TIMEOUT).untilAsserted(() -> {
if (querySpec != null) {
requestBodyBuilder.add("querySpec", querySpec);
}

await().atMost(timeout).untilAsserted(() -> {
var response = managementEndpoint.baseRequest()
.contentType(JSON)
.when()
.body(requestBody)
.body(requestBodyBuilder.build())
.post("/v2/catalog/request")
.then()
.log().ifError()
Expand Down Expand Up @@ -209,10 +239,10 @@ public JsonObject getDatasetForAsset(Participant provider, String assetId) {
.add(TYPE, "DatasetRequest")
.add(ID, assetId)
.add("counterPartyAddress", provider.protocolEndpoint.url.toString())
.add("protocol", DSP_PROTOCOL)
.add("protocol", protocol)
.build();

await().atMost(TIMEOUT).untilAsserted(() -> {
await().atMost(timeout).untilAsserted(() -> {
var response = managementEndpoint.baseRequest()
.contentType(JSON)
.when()
Expand All @@ -234,23 +264,42 @@ public JsonObject getDatasetForAsset(Participant provider, String assetId) {
}

/**
* Initiate negotiation with a provider.
* Initiate negotiation with a provider for an asset.
* - Fetches the dataset for the ID
* - Extracts the first policy
* - Starts the contract negotiation
*
* @param provider data provider
* @param assetId asset id
* @return id of the contract negotiation.
*/
public String initContractNegotiation(Participant provider, String assetId) {
var dataset = getDatasetForAsset(provider, assetId);
assertThat(dataset).withFailMessage("Catalog received from " + provider.getName() + " was empty!").isNotEmpty();

var policy = dataset.getJsonArray(ODRL_POLICY_ATTRIBUTE).get(0).asJsonObject();

return initContractNegotiation(provider, policy);
}

/**
* Initiate negotiation with a provider given an input policy.
*
* @param provider data provider
* @param policy policy
* @return id of the contract agreement.
* @return id of the contract negotiation.
*/
public String negotiateContract(Participant provider, JsonObject policy) {
public String initContractNegotiation(Participant provider, JsonObject policy) {
var requestBody = createObjectBuilder()
.add(CONTEXT, createObjectBuilder().add(VOCAB, EDC_NAMESPACE))
.add(TYPE, "ContractRequestDto")
.add("providerId", provider.id)
.add("counterPartyAddress", provider.protocolEndpoint.url.toString())
.add("protocol", DSP_PROTOCOL)
.add("counterPartyAddress", provider.protocolEndpoint.getUrl().toString())
.add("protocol", protocol)
.add("policy", jsonLd.compact(policy).getContent())
.build();

var negotiationId = managementEndpoint.baseRequest()
return managementEndpoint.baseRequest()
.contentType(JSON)
.body(requestBody)
.when()
Expand All @@ -259,8 +308,20 @@ public String negotiateContract(Participant provider, JsonObject policy) {
.log().ifError()
.statusCode(200)
.extract().body().jsonPath().getString(ID);
}

/**
* Initiate negotiation with a provider.
*
* @param provider data provider
* @param policy policy
* @return id of the contract agreement.
*/
public String negotiateContract(Participant provider, JsonObject policy) {

var negotiationId = initContractNegotiation(provider, policy);

await().atMost(TIMEOUT).untilAsserted(() -> {
await().atMost(timeout).untilAsserted(() -> {
var state = getContractNegotiationState(negotiationId);
assertThat(state).isEqualTo(FINALIZED.name());
});
Expand Down Expand Up @@ -298,7 +359,7 @@ public String initiateTransfer(Participant provider, String contractAgreementId,
.add(CONTEXT, createObjectBuilder().add(VOCAB, EDC_NAMESPACE))
.add(TYPE, "TransferRequest")
.add("dataDestination", destination)
.add("protocol", DSP_PROTOCOL)
.add("protocol", protocol)
.add("assetId", assetId)
.add("contractId", contractAgreementId)
.add("connectorId", provider.id)
Expand All @@ -321,6 +382,38 @@ public String initiateTransfer(Participant provider, String contractAgreementId,
.extract().body().jsonPath().getString(ID);
}

/**
* Returns all the transfer processes with empty query
*
* @return The transfer processes
*/
public JsonArray getTransferProcesses() {
var query = createObjectBuilder()
.add(CONTEXT, createObjectBuilder().add(VOCAB, EDC_NAMESPACE))
.add(TYPE, "QuerySpec")
.build();
return getTransferProcesses(query);
}

/**
* Returns all the transfer processes matching the input query
*
* @param query The input query
* @return The transfer processes
*/
public JsonArray getTransferProcesses(JsonObject query) {
return managementEndpoint.baseRequest()
.contentType(JSON)
.body(query)
.when()
.post("/v2/transferprocesses/request")
.then()
.statusCode(200)
.extract()
.body()
.as(JsonArray.class);
}

/**
* Request a provider asset:
* - retrieves the contract definition associated with the asset,
Expand Down Expand Up @@ -375,26 +468,26 @@ public String getTransferProcessState(String id) {
.extract().body().jsonPath().getString("state");
}

/**
* Get current state of a contract negotiation.
*
* @param id contract negotiation id
* @return state of a contract negotiation.
*/
public String getContractNegotiationState(String id) {
return getContractNegotiationField(id, "state");
}

private ContractOfferId extractContractDefinitionId(JsonObject dataset) {
var contractId = dataset.getJsonArray(ODRL_POLICY_ATTRIBUTE).get(0).asJsonObject().getString(ID);
return ContractOfferId.parseId(contractId).orElseThrow(f -> new RuntimeException(f.getFailureDetail()));
}

private String getContractNegotiationState(String id) {
return managementEndpoint.baseRequest()
.contentType(JSON)
.when()
.get("/v2/contractnegotiations/{id}/state", id)
.then()
.statusCode(200)
.extract().body().jsonPath().getString("state");
}


private String getContractAgreementId(String negotiationId) {
var contractAgreementIdAtomic = new AtomicReference<String>();

await().atMost(TIMEOUT).untilAsserted(() -> {
await().atMost(timeout).untilAsserted(() -> {
var agreementId = getContractNegotiationField(negotiationId, "contractAgreementId");
assertThat(agreementId).isNotNull().isInstanceOf(String.class);

Expand All @@ -406,7 +499,7 @@ private String getContractAgreementId(String negotiationId) {
return contractAgreementId;
}

private String getContractNegotiationField(String negotiationId, String fieldName) {
protected String getContractNegotiationField(String negotiationId, String fieldName) {
return managementEndpoint.baseRequest()
.contentType(JSON)
.when()
Expand Down Expand Up @@ -464,6 +557,16 @@ public B name(String name) {
return self();
}

public B protocol(String protocol) {
participant.protocol = protocol;
return self();
}

public B timeout(Duration timeout) {
participant.timeout = timeout;
return self();
}

public B managementEndpoint(Endpoint managementEndpoint) {
participant.managementEndpoint = managementEndpoint;
return self();
Expand Down

0 comments on commit 70992d8

Please sign in to comment.