Skip to content

Commit

Permalink
add interface + inmem implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
paullatzelsperger committed Feb 22, 2024
1 parent b8e703d commit 736d8b1
Show file tree
Hide file tree
Showing 6 changed files with 365 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

import org.eclipse.edc.connector.dataplane.framework.pipeline.PipelineServiceImpl;
import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceSelectionStrategy;
import org.eclipse.edc.connector.dataplane.framework.store.InMemoryAccessTokenDataStore;
import org.eclipse.edc.connector.dataplane.framework.store.InMemoryDataPlaneStore;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataStore;
import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
Expand All @@ -32,18 +34,16 @@
public class DataPlaneDefaultServicesExtension implements ServiceExtension {

public static final String NAME = "Data Plane Framework Default Services";
@Inject
private Clock clock;
@Inject
private CriterionOperatorRegistry criterionOperatorRegistry;

@Override
public String name() {
return NAME;
}

@Inject
private Clock clock;

@Inject
private CriterionOperatorRegistry criterionOperatorRegistry;

@Provider(isDefault = true)
public TransferServiceSelectionStrategy transferServiceSelectionStrategy() {
return TransferServiceSelectionStrategy.selectFirst();
Expand All @@ -54,6 +54,11 @@ public DataPlaneStore dataPlaneStore() {
return new InMemoryDataPlaneStore(clock, criterionOperatorRegistry);
}

@Provider(isDefault = true)
public AccessTokenDataStore defaultAccessTokenDataStore() {
return new InMemoryAccessTokenDataStore(criterionOperatorRegistry);
}

@Provider(isDefault = true)
public PipelineService pipelineService(ServiceExtensionContext context) {
return new PipelineServiceImpl(context.getMonitor());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.dataplane.framework.store;

import org.eclipse.edc.connector.core.store.ReflectionBasedQueryResolver;
import org.eclipse.edc.connector.dataplane.spi.AccessTokenData;
import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataStore;
import org.eclipse.edc.spi.query.CriterionOperatorRegistry;
import org.eclipse.edc.spi.query.QueryResolver;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.StoreResult;

import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

/**
* In-mem implementation of the {@link AccessTokenDataStore} based on a {@link ConcurrentHashMap}.
*/
public class InMemoryAccessTokenDataStore implements AccessTokenDataStore {
private final Map<String, AccessTokenData> store = new ConcurrentHashMap<>();
private final QueryResolver<AccessTokenData> queryResolver;

public InMemoryAccessTokenDataStore(CriterionOperatorRegistry operatorRegistry) {
this.queryResolver = new ReflectionBasedQueryResolver<>(AccessTokenData.class, operatorRegistry);
}

@Override
public AccessTokenData getById(String id) {
return store.get(id);
}

@Override
public StoreResult<Void> store(AccessTokenData accessTokenData) {

var prev = store.putIfAbsent(accessTokenData.id(), accessTokenData);
return Optional.ofNullable(prev)
.map(a -> StoreResult.<Void>alreadyExists(OBJECT_EXISTS.formatted(accessTokenData.id())))
.orElse(StoreResult.success());
}

@Override
public StoreResult<AccessTokenData> deleteById(String id) {
var prev = store.remove(id);
return Optional.ofNullable(prev)
.map(StoreResult::success)
.orElse(StoreResult.notFound(OBJECT_NOT_FOUND.formatted(id)));
}

@Override
public Collection<AccessTokenData> query(QuerySpec querySpec) {
return queryResolver.query(store.values().stream(), querySpec).toList();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.dataplane.framework.store;

import org.eclipse.edc.connector.core.store.CriterionOperatorRegistryImpl;
import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataStore;
import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataTestBase;

class InMemoryAccessTokenDataStoreTest extends AccessTokenDataTestBase {
private final InMemoryAccessTokenDataStore store = new InMemoryAccessTokenDataStore(CriterionOperatorRegistryImpl.ofDefaults());

@Override
protected AccessTokenDataStore getStore() {
return store;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.dataplane.spi;

import org.eclipse.edc.spi.iam.ClaimToken;
import org.eclipse.edc.spi.types.domain.DataAddress;

/**
* Container object for a {@link ClaimToken} and a {@link DataAddress} that the data plane uses to keep track of currently
* all access tokens that are currently valid.
*/
public record AccessTokenData(ClaimToken claimToken, DataAddress dataAddress, String id) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.dataplane.spi.store;

import org.eclipse.edc.connector.dataplane.spi.AccessTokenData;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.StoreResult;

import java.util.Collection;

/**
* Persistence layer for {@link AccessTokenData} objects, which the data plane uses to keep track of all access tokens that
* are currently valid.
* Implementations must be thread-safe.
*/
public interface AccessTokenDataStore {
String OBJECT_EXISTS = "AccessTokenData with ID '%s' already exists.";
String OBJECT_NOT_FOUND = "AccessTokenData with ID '%s' does not exist.";

/**
* Returns an {@link AccessTokenData} object with the given ID. Returns null if not found.
*
* @param id the ID of the entity.
* @return The entity, or null if not found.
* @throws NullPointerException if the id parameter was null
*/
AccessTokenData getById(String id);

/**
* Adds an {@link AccessTokenData} object to the persistence layer. Will return a failure if an object with the same already exists.
*
* @param accessTokenData the new object
* @return success if stored, a failure if an object with the same ID already exists.
*/
StoreResult<Void> store(AccessTokenData accessTokenData);

/**
* Deletes an {@link AccessTokenData} entity with the given ID.
*
* @param id The ID of the {@link AccessTokenData} that is supposed to be deleted.
* @return success if deleted, a failure if an object with the given ID does not exist.
*/
StoreResult<AccessTokenData> deleteById(String id);

/**
* Returns all {@link AccessTokenData} objects in the store that are covered by a given {@link QuerySpec}.
* <p>
* Note: supplying a sort field that does not exist on the {@link AccessTokenData} may cause some implementations
* to return an empty Stream, others will return an unsorted Stream, depending on the backing storage
* implementation.
*/
Collection<AccessTokenData> query(QuerySpec querySpec);

}
Loading

0 comments on commit 736d8b1

Please sign in to comment.