Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement policy-monitor #3456

Merged
merged 3 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DEPENDENCIES
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
maven/mavencentral/com.apicatalog/carbon-did/0.0.2, Apache-2.0, approved, #9239

Check warning on line 1 in DEPENDENCIES

View workflow job for this annotation

GitHub Actions / check / Dash-Verify-Licenses

Restricted Dependencies found

Some dependencies are marked 'restricted' - please review them
maven/mavencentral/com.apicatalog/iron-verifiable-credentials/0.8.1, Apache-2.0, approved, #9234
maven/mavencentral/com.apicatalog/titanium-json-ld/1.0.0, Apache-2.0, approved, clearlydefined
maven/mavencentral/com.apicatalog/titanium-json-ld/1.3.1, Apache-2.0, approved, #8912
Expand Down Expand Up @@ -75,7 +75,7 @@
maven/mavencentral/com.lmax/disruptor/3.4.4, Apache-2.0, approved, clearlydefined
maven/mavencentral/com.networknt/json-schema-validator/1.0.76, Apache-2.0, approved, CQ22638
maven/mavencentral/com.nimbusds/nimbus-jose-jwt/9.28, Apache-2.0, approved, clearlydefined
maven/mavencentral/com.nimbusds/nimbus-jose-jwt/9.32, , restricted, clearlydefined
maven/mavencentral/com.nimbusds/nimbus-jose-jwt/9.32, Apache-2.0, approved, #10561
maven/mavencentral/com.puppycrawl.tools/checkstyle/10.0, LGPL-2.1-or-later, approved, #7936
maven/mavencentral/com.samskivert/jmustache/1.15, BSD-2-Clause, approved, clearlydefined
maven/mavencentral/com.squareup.okhttp3/okhttp-dnsoverhttps/4.11.0, Apache-2.0, approved, clearlydefined
Expand Down
1 change: 1 addition & 0 deletions core/common/connector-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies {
api(project(":spi:common:validator-spi"))

implementation(project(":core:common:policy-engine"))
implementation(project(":core:common:state-machine"))
implementation(project(":core:common:transform-core"))
implementation(project(":core:common:util"))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.core.entity;

import org.eclipse.edc.spi.entity.StateEntityManager;
import org.eclipse.edc.spi.entity.StatefulEntity;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.persistence.StateEntityStore;
import org.eclipse.edc.spi.retry.ExponentialWaitStrategy;
import org.eclipse.edc.spi.retry.WaitStrategy;
import org.eclipse.edc.spi.system.ExecutorInstrumentation;
import org.eclipse.edc.spi.telemetry.Telemetry;
import org.eclipse.edc.statemachine.StateMachineManager;
import org.eclipse.edc.statemachine.retry.EntityRetryProcessConfiguration;
import org.eclipse.edc.statemachine.retry.EntityRetryProcessFactory;
import org.jetbrains.annotations.NotNull;

import java.time.Clock;
import java.util.Objects;

/**
* Abstraction that provides a common ground for state machine manager implementation.
*
* @param <E> the entity type.
* @param <S> the store type.
*/
public abstract class AbstractStateEntityManager<E extends StatefulEntity<E>, S extends StateEntityStore<E>> implements StateEntityManager {
ndr-brt marked this conversation as resolved.
Show resolved Hide resolved

public static final long DEFAULT_ITERATION_WAIT = 1000;
public static final int DEFAULT_BATCH_SIZE = 20;
public static final int DEFAULT_SEND_RETRY_LIMIT = 7;
public static final long DEFAULT_SEND_RETRY_BASE_DELAY = 1000L;

protected Monitor monitor;
protected int batchSize = DEFAULT_BATCH_SIZE;
protected WaitStrategy waitStrategy = () -> DEFAULT_ITERATION_WAIT;
protected ExecutorInstrumentation executorInstrumentation = ExecutorInstrumentation.noop();
protected Telemetry telemetry = new Telemetry();
protected EntityRetryProcessConfiguration entityRetryProcessConfiguration = defaultEntityRetryProcessConfiguration();
protected EntityRetryProcessFactory entityRetryProcessFactory;
protected StateMachineManager stateMachineManager;
protected Clock clock = Clock.systemUTC();
protected S store;

@Override
public void start() {
entityRetryProcessFactory = new EntityRetryProcessFactory(monitor, clock, entityRetryProcessConfiguration);
var stateMachineManagerBuilder = StateMachineManager.Builder
.newInstance(getClass().getSimpleName(), monitor, executorInstrumentation, waitStrategy);
stateMachineManager = configureStateMachineManager(stateMachineManagerBuilder).build();

stateMachineManager.start();
}

@Override
public void stop() {
if (stateMachineManager != null) {
stateMachineManager.stop();
}
}

/**
* configures the State Machine Manager builder
*
* @param builder the builder.
* @return the builder.
*/
protected abstract StateMachineManager.Builder configureStateMachineManager(StateMachineManager.Builder builder);

@NotNull
private EntityRetryProcessConfiguration defaultEntityRetryProcessConfiguration() {
return new EntityRetryProcessConfiguration(DEFAULT_SEND_RETRY_LIMIT, () -> new ExponentialWaitStrategy(DEFAULT_SEND_RETRY_BASE_DELAY));
}

protected void update(E entity) {
store.save(entity);
monitor.debug(() -> "[%s] %s %s is now in state %s"
.formatted(this.getClass().getSimpleName(), entity.getClass().getSimpleName(),
entity.getId(), entity.stateAsString()));
}

protected void breakLease(E entity) {
store.save(entity);
}

public abstract static class Builder<E extends StatefulEntity<E>, S extends StateEntityStore<E>, M extends AbstractStateEntityManager<E, S>, B extends Builder<E, S, M, B>> {

protected final M manager;

protected Builder(M manager) {
this.manager = manager;
}

public abstract B self();

public B monitor(Monitor monitor) {
manager.monitor = monitor;
return self();
}

public B batchSize(int batchSize) {
manager.batchSize = batchSize;
return self();
}

public B waitStrategy(WaitStrategy waitStrategy) {
manager.waitStrategy = waitStrategy;
return self();
}

public B clock(Clock clock) {
manager.clock = clock;
return self();
}

public B telemetry(Telemetry telemetry) {
manager.telemetry = telemetry;
return self();
}

public B executorInstrumentation(ExecutorInstrumentation executorInstrumentation) {
manager.executorInstrumentation = executorInstrumentation;
return self();
}

public B entityRetryProcessConfiguration(EntityRetryProcessConfiguration entityRetryProcessConfiguration) {
manager.entityRetryProcessConfiguration = entityRetryProcessConfiguration;
return self();
}

public B store(S store) {
manager.store = store;
return self();
}

public M build() {
Objects.requireNonNull(manager.store, "store");
Objects.requireNonNull(manager.monitor, "monitor");

manager.entityRetryProcessFactory = new EntityRetryProcessFactory(manager.monitor, manager.clock, manager.entityRetryProcessConfiguration);

return manager;
}
}

}
1 change: 1 addition & 0 deletions core/control-plane/contract-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
api(project(":spi:common:policy-engine-spi"))
api(project(":spi:control-plane:contract-spi"))

implementation(project(":core:common:connector-core"))
implementation(project(":core:common:state-machine"))
implementation(libs.opentelemetry.instrumentation.annotations)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@

import static org.eclipse.edc.connector.contract.spi.validation.ContractValidationService.TRANSFER_SCOPE;
import static org.eclipse.edc.connector.contract.validation.ContractExpiryCheckFunction.CONTRACT_EXPIRY_EVALUATION_KEY;
import static org.eclipse.edc.connector.core.entity.AbstractStateEntityManager.DEFAULT_BATCH_SIZE;
import static org.eclipse.edc.connector.core.entity.AbstractStateEntityManager.DEFAULT_ITERATION_WAIT;
import static org.eclipse.edc.connector.core.entity.AbstractStateEntityManager.DEFAULT_SEND_RETRY_BASE_DELAY;
import static org.eclipse.edc.connector.core.entity.AbstractStateEntityManager.DEFAULT_SEND_RETRY_LIMIT;
import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_SCHEMA;

@Provides({
Expand All @@ -72,11 +76,6 @@ public class ContractCoreExtension implements ServiceExtension {

public static final String NAME = "Contract Core";

public static final long DEFAULT_ITERATION_WAIT = 1000;
public static final int DEFAULT_BATCH_SIZE = 20;
public static final int DEFAULT_SEND_RETRY_LIMIT = 7;
public static final long DEFAULT_SEND_RETRY_BASE_DELAY = 1000L;

@Setting(value = "the iteration wait time in milliseconds in the negotiation state machine. Default value " + DEFAULT_ITERATION_WAIT, type = "long")
private static final String NEGOTIATION_STATE_MACHINE_ITERATION_WAIT_MILLIS = "edc.negotiation.state-machine.iteration-wait-millis";

Expand Down
Loading
Loading