Skip to content

Commit

Permalink
sql implementation of the store
Browse files Browse the repository at this point in the history
  • Loading branch information
paullatzelsperger committed Feb 22, 2024
1 parent 736d8b1 commit 040acd0
Show file tree
Hide file tree
Showing 16 changed files with 511 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ public StoreResult<Void> store(AccessTokenData accessTokenData) {
}

@Override
public StoreResult<AccessTokenData> deleteById(String id) {
public StoreResult<Void> deleteById(String id) {
var prev = store.remove(id);
return Optional.ofNullable(prev)
.map(StoreResult::success)
.map(p -> StoreResult.<Void>success())
.orElse(StoreResult.notFound(OBJECT_NOT_FOUND.formatted(id)));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
plugins {
`java-library`
`maven-publish`
}

dependencies {
api(project(":spi:common:transaction-spi"))
api(project(":spi:data-plane:data-plane-spi"))

implementation(project(":spi:common:transaction-datasource-spi"))
implementation(project(":extensions:common:sql:sql-core"))

testImplementation(project(":core:common:junit"))
testImplementation(testFixtures(project(":spi:data-plane:data-plane-spi")))
testImplementation(testFixtures(project(":extensions:common:sql:sql-lease")))
testImplementation(testFixtures(project(":extensions:common:sql:sql-core")))

}


Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- Statements are designed for and tested with Postgres only!

CREATE TABLE IF NOT EXISTS edc_accesstokendata
(
id VARCHAR NOT NULL PRIMARY KEY,
claim_token json NOT NULL,
data_address JSON NOT NULL
);

COMMENT ON COLUMN edc_accesstokendata.claim_token IS 'ClaimToken serialized as JSON map';
COMMENT ON COLUMN edc_accesstokendata.data_address IS 'DataAddress serialized as JSON map';
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright (c) 2022 Microsoft Corporation
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Microsoft Corporation - initial API and implementation
*
*/

package org.eclipse.edc.connector.dataplane.store.sql;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.edc.connector.dataplane.spi.AccessTokenData;
import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataStore;
import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore;
import org.eclipse.edc.connector.dataplane.store.sql.schema.AccessTokenDataStatements;
import org.eclipse.edc.spi.iam.ClaimToken;
import org.eclipse.edc.spi.persistence.EdcPersistenceException;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.sql.QueryExecutor;
import org.eclipse.edc.sql.store.AbstractSqlStore;
import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.jetbrains.annotations.Nullable;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;

import static org.eclipse.edc.spi.query.Criterion.criterion;

/**
* SQL implementation of {@link DataPlaneStore}
*/
public class SqlAccessTokenDataStore extends AbstractSqlStore implements AccessTokenDataStore {

private final AccessTokenDataStatements statements;

public SqlAccessTokenDataStore(DataSourceRegistry dataSourceRegistry, String dataSourceName, TransactionContext transactionContext,
AccessTokenDataStatements statements, ObjectMapper objectMapper, QueryExecutor queryExecutor) {
super(dataSourceRegistry, dataSourceName, transactionContext, objectMapper, queryExecutor);
this.statements = statements;
}

@Override
public AccessTokenData getById(String id) {
return transactionContext.execute(() -> {
try (var connection = getConnection()) {
return findByIdInternal(connection, id);
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
});
}

@Override
public StoreResult<Void> store(AccessTokenData accessTokenData) {
return transactionContext.execute(() -> {
try (var connection = getConnection()) {
if (findByIdInternal(connection, accessTokenData.id()) != null) {
return StoreResult.alreadyExists(OBJECT_EXISTS.formatted(accessTokenData.id()));
}
insert(connection, accessTokenData);
return StoreResult.success();
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
});
}

@Override
public StoreResult<Void> deleteById(String id) {
return transactionContext.execute(() -> {
try (var connection = getConnection()) {
if (findByIdInternal(connection, id) == null) {
return StoreResult.notFound(OBJECT_NOT_FOUND.formatted(id));
}
var sql = statements.getDeleteTemplate();
queryExecutor.execute(connection, sql, id);
return StoreResult.success();
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
});
}

@Override
public Collection<AccessTokenData> query(QuerySpec querySpec) {
return transactionContext.execute(() -> {
try (var conn = getConnection()) {
var sql = statements.createQuery(querySpec);
return queryExecutor.query(conn, true, this::mapAccessTokenData, sql.getQueryAsString(), sql.getParameters()).toList();
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
});
}

private void insert(Connection connection, AccessTokenData dataFlow) {
var sql = statements.getInsertTemplate();
queryExecutor.execute(connection, sql,
dataFlow.id(),
toJson(dataFlow.claimToken()),
toJson(dataFlow.dataAddress())
);
}


private AccessTokenData mapAccessTokenData(ResultSet resultSet) throws SQLException {
var claimToken = fromJson(resultSet.getString(statements.getClaimTokenColumn()), ClaimToken.class);
var dataAddress = fromJson(resultSet.getString(statements.getDataAddressColumn()), DataAddress.class);
var id = resultSet.getString(statements.getIdColumn());

return new AccessTokenData(id, claimToken, dataAddress);
}

private @Nullable AccessTokenData findByIdInternal(Connection conn, String id) {
return transactionContext.execute(() -> {
var querySpec = QuerySpec.Builder.newInstance().filter(criterion("id", "=", id)).build();
var statement = statements.createQuery(querySpec);
return queryExecutor.query(conn, true, this::mapAccessTokenData, statement.getQueryAsString(), statement.getParameters())
.findFirst().orElse(null);
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2022 Microsoft Corporation
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Microsoft Corporation - initial API and implementation
*
*/

package org.eclipse.edc.connector.dataplane.store.sql;

import org.eclipse.edc.connector.dataplane.spi.AccessTokenData;
import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataStore;
import org.eclipse.edc.connector.dataplane.store.sql.schema.AccessTokenDataStatements;
import org.eclipse.edc.connector.dataplane.store.sql.schema.postgres.PostgresAccessTokenDataStatements;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.sql.QueryExecutor;
import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry;
import org.eclipse.edc.transaction.spi.TransactionContext;

import java.time.Clock;

/**
* Provides Sql Store for {@link AccessTokenData} objects
*/
@Extension(value = SqlAccessTokenDataStoreExtension.NAME)
public class SqlAccessTokenDataStoreExtension implements ServiceExtension {

public static final String NAME = "Sql AccessTokenData Store";

@Setting(value = "Name of the datasource to use for accessing data plane store")
private static final String DATASOURCE_SETTING_NAME = "edc.datasource.accesstokendata.name";

@Inject
private DataSourceRegistry dataSourceRegistry;

@Inject
private TransactionContext transactionContext;

@Inject(required = false)
private AccessTokenDataStatements statements;

@Inject
private Clock clock;

@Inject
private TypeManager typeManager;

@Inject
private QueryExecutor queryExecutor;

@Override
public String name() {
return NAME;
}

@Provider
public AccessTokenDataStore dataPlaneStore(ServiceExtensionContext context) {
return new SqlAccessTokenDataStore(dataSourceRegistry, getDataSourceName(context), transactionContext,
getStatementImpl(), typeManager.getMapper(), queryExecutor);
}

/**
* returns an externally-provided sql statement dialect, or postgres as a default
*/
private AccessTokenDataStatements getStatementImpl() {
return statements != null ? statements : new PostgresAccessTokenDataStatements();
}

private String getDataSourceName(ServiceExtensionContext context) {
return context.getConfig().getString(DATASOURCE_SETTING_NAME, DataSourceRegistry.DEFAULT_DATASOURCE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2020 - 2022 Microsoft Corporation
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Microsoft Corporation - initial API and implementation
*
*/

package org.eclipse.edc.connector.dataplane.store.sql.schema;

import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.sql.statement.SqlStatements;
import org.eclipse.edc.sql.translation.SqlQueryStatement;

/**
* Sql Statements for DataPlane Store
*/
public interface AccessTokenDataStatements extends SqlStatements {

default String getIdColumn() {
return "id";
}

default String getTableName() {
return "edc_accesstokendata";
}

default String getClaimTokenColumn() {
return "claim_token";
}

default String getDataAddressColumn() {
return "data_address";
}

String getInsertTemplate();

String getSelectTemplate();

String getDeleteTemplate();

SqlQueryStatement createQuery(QuerySpec querySpec);

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2020 - 2022 Microsoft Corporation
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Microsoft Corporation - initial API and implementation
*
*/

package org.eclipse.edc.connector.dataplane.store.sql.schema;

import org.eclipse.edc.connector.dataplane.store.sql.schema.postgres.AccessTokenDataMapping;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.sql.translation.SqlOperatorTranslator;
import org.eclipse.edc.sql.translation.SqlQueryStatement;

public class BaseSqlAccessTokenStatements implements AccessTokenDataStatements {

protected final SqlOperatorTranslator operatorTranslator;

public BaseSqlAccessTokenStatements(SqlOperatorTranslator operatorTranslator) {
this.operatorTranslator = operatorTranslator;
}

@Override
public String getInsertTemplate() {
return executeStatement()
.column(getIdColumn())
.jsonColumn(getClaimTokenColumn())
.jsonColumn(getDataAddressColumn())
.insertInto(getTableName());
}

@Override
public String getSelectTemplate() {
return "SELECT * FROM %s".formatted(getTableName());
}

@Override
public String getDeleteTemplate() {
return executeStatement()
.delete(getTableName(), getIdColumn());
}

@Override
public SqlQueryStatement createQuery(QuerySpec querySpec) {
return new SqlQueryStatement(getSelectTemplate(), querySpec, new AccessTokenDataMapping(this), operatorTranslator);
}
}

0 comments on commit 040acd0

Please sign in to comment.