Skip to content

Commit

Permalink
feat: removes StatusChecker (#3480)
Browse files Browse the repository at this point in the history
  • Loading branch information
wolf4ood committed Sep 28, 2023
1 parent 7dc9dde commit bbfcbab
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 269 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessTerminated;
import org.eclipse.edc.connector.transfer.spi.retry.TransferWaitStrategy;
import org.eclipse.edc.connector.transfer.spi.status.StatusCheckerRegistry;
import org.eclipse.edc.connector.transfer.spi.types.StatusChecker;
import org.eclipse.edc.connector.transfer.spi.types.TransferRequest;
import org.eclipse.edc.connector.transfer.spi.types.command.TerminateTransferCommand;
import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferStartMessage;
Expand Down Expand Up @@ -113,7 +111,6 @@ void shouldDispatchEventsOnTransferProcessStateChanges(TransferProcessService se
TransferProcessProtocolService protocolService,
EventRouter eventRouter,
RemoteMessageDispatcherRegistry dispatcherRegistry,
StatusCheckerRegistry statusCheckerRegistry,
PolicyArchive policyArchive,
ContractNegotiationStore negotiationStore,
ParticipantAgentService agentService) {
Expand All @@ -132,10 +129,7 @@ void shouldDispatchEventsOnTransferProcessStateChanges(TransferProcessService se
when(negotiationStore.findContractAgreement("contractId")).thenReturn(agreement);
when(agentService.createFor(token)).thenReturn(agent);
eventRouter.register(TransferProcessEvent.class, eventSubscriber);
var statusCheck = mock(StatusChecker.class);

statusCheckerRegistry.register("any", statusCheck);
when(statusCheck.isComplete(any(), any())).thenReturn(false);
var transferRequest = createTransferRequest();

var initiateResult = service.initiateTransfer(transferRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.eclipse.edc.connector.transfer.spi.provision.ProvisionManager;
import org.eclipse.edc.connector.transfer.spi.provision.ResourceManifestGenerator;
import org.eclipse.edc.connector.transfer.spi.retry.TransferWaitStrategy;
import org.eclipse.edc.connector.transfer.spi.status.StatusCheckerRegistry;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.transfer.spi.types.DataRequest;
import org.eclipse.edc.connector.transfer.spi.types.DeprovisionedResource;
Expand Down Expand Up @@ -94,9 +93,6 @@ public class TransferCoreExtension implements ServiceExtension {
@Inject
private DataFlowManager dataFlowManager;

@Inject
private StatusCheckerRegistry statusCheckerRegistry;

@Inject
private ResourceManifestGenerator resourceManifestGenerator;

Expand Down Expand Up @@ -180,7 +176,6 @@ public void initialize(ServiceExtensionContext context) {
.dataFlowManager(dataFlowManager)
.provisionManager(provisionManager)
.dispatcherRegistry(dispatcherRegistry)
.statusCheckerRegistry(statusCheckerRegistry)
.monitor(monitor)
.telemetry(telemetry)
.executorInstrumentation(executorInstrumentation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@

import org.eclipse.edc.connector.transfer.flow.DataFlowManagerImpl;
import org.eclipse.edc.connector.transfer.observe.TransferProcessObservableImpl;
import org.eclipse.edc.connector.transfer.process.StatusCheckerRegistryImpl;
import org.eclipse.edc.connector.transfer.provision.ProvisionManagerImpl;
import org.eclipse.edc.connector.transfer.provision.ResourceManifestGeneratorImpl;
import org.eclipse.edc.connector.transfer.spi.TransferProcessPendingGuard;
import org.eclipse.edc.connector.transfer.spi.flow.DataFlowManager;
import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessObservable;
import org.eclipse.edc.connector.transfer.spi.provision.ProvisionManager;
import org.eclipse.edc.connector.transfer.spi.provision.ResourceManifestGenerator;
import org.eclipse.edc.connector.transfer.spi.status.StatusCheckerRegistry;
import org.eclipse.edc.policy.engine.spi.PolicyEngine;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
Expand Down Expand Up @@ -55,11 +53,6 @@ public ResourceManifestGenerator resourceManifestGenerator() {
return new ResourceManifestGeneratorImpl(policyEngine);
}

@Provider
public StatusCheckerRegistry statusCheckerRegistry() {
return new StatusCheckerRegistryImpl();
}

@Provider
public ProvisionManager provisionManager(ServiceExtensionContext context) {
return new ProvisionManagerImpl(context.getMonitor());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessStartedData;
import org.eclipse.edc.connector.transfer.spi.provision.ProvisionManager;
import org.eclipse.edc.connector.transfer.spi.provision.ResourceManifestGenerator;
import org.eclipse.edc.connector.transfer.spi.status.StatusCheckerRegistry;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse;
import org.eclipse.edc.connector.transfer.spi.types.DataRequest;
Expand All @@ -53,7 +52,6 @@
import org.eclipse.edc.statemachine.Processor;
import org.eclipse.edc.statemachine.ProcessorImpl;
import org.eclipse.edc.statemachine.StateMachineManager;
import org.jetbrains.annotations.NotNull;

import java.util.List;
import java.util.Objects;
Expand All @@ -71,7 +69,6 @@
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.PROVISIONING;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.REQUESTED;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.REQUESTING;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTING;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.TERMINATING;
import static org.eclipse.edc.spi.persistence.StateEntityStore.hasState;
Expand Down Expand Up @@ -105,7 +102,6 @@ public class TransferProcessManagerImpl extends AbstractStateEntityManager<Trans
private ProvisionManager provisionManager;
private RemoteMessageDispatcherRegistry dispatcherRegistry;
private DataFlowManager dataFlowManager;
private StatusCheckerRegistry statusCheckerRegistry;
private Vault vault;
private TransferProcessObservable observable;
private DataAddressResolver addressResolver;
Expand All @@ -118,20 +114,6 @@ public class TransferProcessManagerImpl extends AbstractStateEntityManager<Trans
private TransferProcessManagerImpl() {
}

@Override
protected StateMachineManager.Builder configureStateMachineManager(StateMachineManager.Builder builder) {
return builder
.processor(processTransfersInState(INITIAL, this::processInitial))
.processor(processTransfersInState(PROVISIONING, this::processProvisioning))
.processor(processTransfersInState(PROVISIONED, this::processProvisioned))
.processor(processConsumerTransfersInState(REQUESTING, this::processRequesting))
.processor(processProviderTransfersInState(STARTING, this::processStarting))
.processor(processConsumerTransfersInState(STARTED, this::processStarted))
.processor(processTransfersInState(COMPLETING, this::processCompleting))
.processor(processTransfersInState(TERMINATING, this::processTerminating))
.processor(processTransfersInState(DEPROVISIONING, this::processDeprovisioning));
}

/**
* Initiate a consumer request TransferProcess.
*/
Expand Down Expand Up @@ -174,6 +156,19 @@ public StatusResult<TransferProcess> initiateConsumerRequest(TransferRequest tra
return StatusResult.success(process);
}

@Override
protected StateMachineManager.Builder configureStateMachineManager(StateMachineManager.Builder builder) {
return builder
.processor(processTransfersInState(INITIAL, this::processInitial))
.processor(processTransfersInState(PROVISIONING, this::processProvisioning))
.processor(processTransfersInState(PROVISIONED, this::processProvisioned))
.processor(processConsumerTransfersInState(REQUESTING, this::processRequesting))
.processor(processProviderTransfersInState(STARTING, this::processStarting))
.processor(processTransfersInState(COMPLETING, this::processCompleting))
.processor(processTransfersInState(TERMINATING, this::processTerminating))
.processor(processTransfersInState(DEPROVISIONING, this::processDeprovisioning));
}

/**
* Process INITIAL transfer<p> set it to PROVISIONING
*
Expand Down Expand Up @@ -345,37 +340,6 @@ private void sendTransferStartMessage(TransferProcess process, DataFlowResponse
.execute(description);
}

/**
* Process STARTED transfer<p> if is completed or there's no checker and it's not managed, set to COMPLETE,
* nothing otherwise.
*
* @param transferProcess the STARTED transfer fetched
* @return if the transfer has been processed or not
*/
@WithSpan
private boolean processStarted(TransferProcess transferProcess) {
return entityRetryProcessFactory.doSimpleProcess(transferProcess, () -> checkCompletion(transferProcess))
.execute("Check completion");
}

@NotNull
private Boolean checkCompletion(TransferProcess transferProcess) {
var checker = statusCheckerRegistry.resolve(transferProcess.getDataDestination().getType());
if (checker == null) {
monitor.warning(format("No checker found for process %s. The process will not advance to the COMPLETED state.", transferProcess.getId()));
return false;
} else {
var resources = transferProcess.getProvisionedResources();
if (checker.isComplete(transferProcess, resources)) {
transitionToCompleting(transferProcess);
return true;
} else {
monitor.debug(format("Transfer process %s not COMPLETED yet. The process will stay in STARTED.", transferProcess.getId()));
return false;
}
}
}

/**
* Process COMPLETING transfer<p> Send COMPLETED message to counter-part
*
Expand Down Expand Up @@ -588,6 +552,22 @@ public Builder self() {
return this;
}

@Override
public TransferProcessManagerImpl build() {
super.build();
Objects.requireNonNull(manager.manifestGenerator, "manifestGenerator cannot be null");
Objects.requireNonNull(manager.provisionManager, "provisionManager cannot be null");
Objects.requireNonNull(manager.dataFlowManager, "dataFlowManager cannot be null");
Objects.requireNonNull(manager.dispatcherRegistry, "dispatcherRegistry cannot be null");
Objects.requireNonNull(manager.observable, "observable cannot be null");
Objects.requireNonNull(manager.policyArchive, "policyArchive cannot be null");
Objects.requireNonNull(manager.addressResolver, "addressResolver cannot be null");
Objects.requireNonNull(manager.provisionResponsesHandler, "provisionResultHandler cannot be null");
Objects.requireNonNull(manager.deprovisionResponsesHandler, "deprovisionResponsesHandler cannot be null");

return manager;
}

public Builder manifestGenerator(ResourceManifestGenerator manifestGenerator) {
manager.manifestGenerator = manifestGenerator;
return this;
Expand All @@ -608,11 +588,6 @@ public Builder dispatcherRegistry(RemoteMessageDispatcherRegistry registry) {
return this;
}

public Builder statusCheckerRegistry(StatusCheckerRegistry statusCheckerRegistry) {
manager.statusCheckerRegistry = statusCheckerRegistry;
return this;
}

public Builder vault(Vault vault) {
manager.vault = vault;
return this;
Expand Down Expand Up @@ -652,23 +627,6 @@ public Builder pendingGuard(TransferProcessPendingGuard pendingGuard) {
manager.pendingGuard = pendingGuard;
return this;
}

@Override
public TransferProcessManagerImpl build() {
super.build();
Objects.requireNonNull(manager.manifestGenerator, "manifestGenerator cannot be null");
Objects.requireNonNull(manager.provisionManager, "provisionManager cannot be null");
Objects.requireNonNull(manager.dataFlowManager, "dataFlowManager cannot be null");
Objects.requireNonNull(manager.dispatcherRegistry, "dispatcherRegistry cannot be null");
Objects.requireNonNull(manager.statusCheckerRegistry, "statusCheckerRegistry cannot be null!");
Objects.requireNonNull(manager.observable, "observable cannot be null");
Objects.requireNonNull(manager.policyArchive, "policyArchive cannot be null");
Objects.requireNonNull(manager.addressResolver, "addressResolver cannot be null");
Objects.requireNonNull(manager.provisionResponsesHandler, "provisionResultHandler cannot be null");
Objects.requireNonNull(manager.deprovisionResponsesHandler, "deprovisionResponsesHandler cannot be null");

return manager;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ void setup() {
.manifestGenerator(manifestGenerator)
.monitor(monitor)
.clock(clock)
.statusCheckerRegistry(mock())
.observable(mock())
.store(store)
.policyArchive(policyArchive)
Expand Down
Loading

0 comments on commit bbfcbab

Please sign in to comment.