Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,18 @@ private void persistEntity(
Connection connection,
QueryAction queryAction)
throws SQLException {
ModelEntity modelEntity = ModelEntity.fromEntity(entity);
ModelEntity modelEntity = ModelEntity.fromEntity(entity, schemaVersion);
if (originalEntity == null) {
try {
List<Object> values =
modelEntity.toMap(datasourceOperations.getDatabaseType()).values().stream().toList();
queryAction.apply(
connection,
QueryGenerator.generateInsertQuery(
ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, values, realmId));
ModelEntity.getAllColumnNames(schemaVersion),
ModelEntity.TABLE_NAME,
values,
realmId));
} catch (SQLException e) {
if (datasourceOperations.isConstraintViolation(e)) {
PolarisBaseEntity existingEntity =
Expand Down Expand Up @@ -222,7 +225,10 @@ private void persistEntity(
queryAction.apply(
connection,
QueryGenerator.generateUpdateQuery(
ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, values, params));
ModelEntity.getAllColumnNames(schemaVersion),
ModelEntity.TABLE_NAME,
values,
params));
if (rowsUpdated == 0) {
throw new RetryOnConcurrencyException(
"Entity '%s' id '%s' concurrently modified; expected version %s",
Expand Down Expand Up @@ -310,7 +316,7 @@ public void writeEvents(@Nonnull List<PolarisEvent> events) {

@Override
public void deleteEntity(@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity) {
ModelEntity modelEntity = ModelEntity.fromEntity(entity);
ModelEntity modelEntity = ModelEntity.fromEntity(entity, schemaVersion);
Map<String, Object> params =
Map.of(
"id",
Expand All @@ -322,7 +328,7 @@ public void deleteEntity(@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBa
try {
datasourceOperations.executeUpdate(
QueryGenerator.generateDeleteQuery(
ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params));
ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params));
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to delete entity due to %s", e.getMessage()), e);
Expand Down Expand Up @@ -370,7 +376,7 @@ public void deleteAll(@Nonnull PolarisCallContext callCtx) {
datasourceOperations.execute(
connection,
QueryGenerator.generateDeleteQuery(
ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params));
ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params));
datasourceOperations.execute(
connection,
QueryGenerator.generateDeleteQuery(
Expand Down Expand Up @@ -402,7 +408,7 @@ public PolarisBaseEntity lookupEntity(
Map.of("catalog_id", catalogId, "id", entityId, "type_code", typeCode, "realm_id", realmId);
return getPolarisBaseEntity(
QueryGenerator.generateSelectQuery(
ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params));
ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params));
}

@Override
Expand All @@ -426,13 +432,13 @@ public PolarisBaseEntity lookupEntityByName(
realmId);
return getPolarisBaseEntity(
QueryGenerator.generateSelectQuery(
ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params));
ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params));
}

@Nullable
private PolarisBaseEntity getPolarisBaseEntity(QueryGenerator.PreparedQuery query) {
try {
var results = datasourceOperations.executeSelect(query, new ModelEntity());
var results = datasourceOperations.executeSelect(query, new ModelEntity(schemaVersion));
if (results.isEmpty()) {
return null;
} else if (results.size() > 1) {
Expand All @@ -454,9 +460,10 @@ private PolarisBaseEntity getPolarisBaseEntity(QueryGenerator.PreparedQuery quer
public List<PolarisBaseEntity> lookupEntities(
@Nonnull PolarisCallContext callCtx, List<PolarisEntityId> entityIds) {
if (entityIds == null || entityIds.isEmpty()) return new ArrayList<>();
PreparedQuery query = QueryGenerator.generateSelectQueryWithEntityIds(realmId, entityIds);
PreparedQuery query =
QueryGenerator.generateSelectQueryWithEntityIds(realmId, schemaVersion, entityIds);
try {
return datasourceOperations.executeSelect(query, new ModelEntity());
return datasourceOperations.executeSelect(query, new ModelEntity(schemaVersion));
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to retrieve polaris entities due to %s", e.getMessage()), e);
Expand All @@ -472,7 +479,7 @@ public List<PolarisChangeTrackingVersions> lookupEntityVersions(
.collect(
Collectors.toMap(
entry -> new PolarisEntityId(entry.getCatalogId(), entry.getId()),
ModelEntity::fromEntity));
entry -> ModelEntity.fromEntity(entry, schemaVersion)));
return entityIds.stream()
.map(
entityId -> {
Expand Down Expand Up @@ -575,11 +582,16 @@ public <T> Page<T> loadEntities(
try {
PreparedQuery query =
buildEntityQuery(
catalogId, parentId, entityType, entitySubType, pageToken, ModelEntity.ALL_COLUMNS);
catalogId,
parentId,
entityType,
entitySubType,
pageToken,
ModelEntity.getAllColumnNames(schemaVersion));
AtomicReference<Page<T>> results = new AtomicReference<>();
datasourceOperations.executeSelectOverStream(
query,
new ModelEntity(),
new ModelEntity(schemaVersion),
stream -> {
var data = stream.filter(entityFilter);
results.set(Page.mapped(pageToken, data, transformer, EntityIdToken::fromEntity));
Expand All @@ -600,7 +612,7 @@ public int lookupEntityGrantRecordsVersion(
PolarisBaseEntity b =
getPolarisBaseEntity(
QueryGenerator.generateSelectQuery(
ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params));
ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params));
return b == null ? 0 : b.getGrantRecordsVersion();
}

Expand Down Expand Up @@ -714,8 +726,8 @@ public boolean hasChildren(
var results =
datasourceOperations.executeSelect(
QueryGenerator.generateSelectQuery(
ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params),
new ModelEntity());
ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params),
new ModelEntity(schemaVersion));
return results != null && !results.isEmpty();
} catch (SQLException e) {
throw new RuntimeException(
Expand Down Expand Up @@ -759,9 +771,9 @@ Optional<Optional<String>> hasOverlappingSiblings(

PreparedQuery query =
QueryGenerator.generateOverlapQuery(
realmId, entity.getCatalogId(), entity.getBaseLocation());
realmId, schemaVersion, entity.getCatalogId(), entity.getBaseLocation());
try {
var results = datasourceOperations.executeSelect(query, new ModelEntity());
var results = datasourceOperations.executeSelect(query, new ModelEntity(schemaVersion));
if (!results.isEmpty()) {
StorageLocation entityLocation = StorageLocation.of(entity.getBaseLocation());
for (PolarisBaseEntity result : results) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,13 @@ public static PreparedQuery generateDeleteQueryForEntityGrantRecords(
* Builds a SELECT query using a list of entity ID pairs (catalog_id, id).
*
* @param realmId Realm to filter by.
* @param schemaVersion The schema version of entities table to query
* @param entityIds List of PolarisEntityId pairs.
* @return SELECT query to retrieve matching entities.
* @throws IllegalArgumentException if entityIds is empty.
*/
public static PreparedQuery generateSelectQueryWithEntityIds(
@Nonnull String realmId, @Nonnull List<PolarisEntityId> entityIds) {
@Nonnull String realmId, int schemaVersion, @Nonnull List<PolarisEntityId> entityIds) {
if (entityIds.isEmpty()) {
throw new IllegalArgumentException("Empty entity ids");
}
Expand All @@ -131,7 +132,9 @@ public static PreparedQuery generateSelectQueryWithEntityIds(
params.add(realmId);
String where = " WHERE (catalog_id, id) IN (" + placeholders + ") AND realm_id = ?";
return new PreparedQuery(
generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where, null).sql(),
generateSelectQuery(
ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, where, null)
.sql(),
params);
}

Expand Down Expand Up @@ -260,13 +263,14 @@ static PreparedQuery generateVersionQuery() {
* This should be combined with a check using `StorageLocation`.
*
* @param realmId A realm to search within
* @param schemaVersion The schema version of entities table to query
* @param catalogId A catalog entity to search within
* @param baseLocation The base location to look for overlap with, with or without a scheme
* @return The list of possibly overlapping entities that meet the criteria
*/
@VisibleForTesting
public static PreparedQuery generateOverlapQuery(
String realmId, long catalogId, String baseLocation) {
String realmId, int schemaVersion, long catalogId, String baseLocation) {
StorageLocation baseStorageLocation = StorageLocation.of(baseLocation);
String locationWithoutScheme = baseStorageLocation.withoutScheme();

Expand Down Expand Up @@ -297,7 +301,11 @@ public static PreparedQuery generateOverlapQuery(

QueryFragment where = new QueryFragment(clause, finalParams);
PreparedQuery query =
generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where.sql(), null);
generateSelectQuery(
ModelEntity.getAllColumnNames(schemaVersion),
ModelEntity.TABLE_NAME,
where.sql(),
null);
return new PreparedQuery(query.sql(), where.parameters());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public interface Converter<T> {
T fromResultSet(ResultSet rs) throws SQLException;

/**
* Convert a model into a Map with keys as snake case names, where as values as values of member
* of model obj.
* Convert a model into a Map with keys as snake case names, and values as values of member of
* model obj.
*/
Map<String, Object> toMap(DatabaseType databaseType);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,25 @@ public class ModelEntity implements Converter<PolarisBaseEntity> {

public static final String ID_COLUMN = "id";

public static final List<String> ALL_COLUMNS =
private static final List<String> ALL_COLUMNS =
List.of(
"id",
"catalog_id",
"parent_id",
"type_code",
"name",
"entity_version",
"sub_type_code",
"create_timestamp",
"drop_timestamp",
"purge_timestamp",
"to_purge_timestamp",
"last_update_timestamp",
"properties",
"internal_properties",
"grant_records_version");

private static final List<String> ALL_COLUMNS_V2 =
List.of(
"id",
"catalog_id",
Expand All @@ -54,6 +72,14 @@ public class ModelEntity implements Converter<PolarisBaseEntity> {
"grant_records_version",
"location_without_scheme");

public static List<String> getAllColumnNames(int schemaVersion) {
if (schemaVersion < 2) {
return ALL_COLUMNS;
} else {
return ALL_COLUMNS_V2;
}
}

public static final List<String> ENTITY_LOOKUP_COLUMNS =
List.of("id", "catalog_id", "parent_id", "type_code", "name", "sub_type_code");

Expand Down Expand Up @@ -106,6 +132,16 @@ public class ModelEntity implements Converter<PolarisBaseEntity> {
// location for the entity but without a scheme, when applicable
private String locationWithoutScheme;

// schema version of the entity
// NOTE: this field is not stored in the database, but is used to handle schema changes
private int schemaVersion;

public ModelEntity(int schemaVersion) {
this.schemaVersion = schemaVersion;
}

public ModelEntity() {}

public long getId() {
return id;
}
Expand Down Expand Up @@ -170,12 +206,17 @@ public String getLocationWithoutScheme() {
return locationWithoutScheme;
}

public int getSchemaVersion() {
return schemaVersion;
}

public static Builder builder() {
return new Builder();
}

@Override
public PolarisBaseEntity fromResultSet(ResultSet r) throws SQLException {

var modelEntity =
ModelEntity.builder()
.catalogId(r.getObject("catalog_id", Long.class))
Expand All @@ -195,7 +236,8 @@ public PolarisBaseEntity fromResultSet(ResultSet r) throws SQLException {
// JSONB: use getString(), not getObject().
.internalProperties(r.getString("internal_properties"))
.grantRecordsVersion(r.getObject("grant_records_version", Integer.class))
.locationWithoutScheme(r.getString("location_without_scheme"))
.locationWithoutScheme(
this.schemaVersion >= 2 ? r.getString("location_without_scheme") : null)
.build();

return toEntity(modelEntity);
Expand Down Expand Up @@ -224,7 +266,9 @@ public Map<String, Object> toMap(DatabaseType databaseType) {
map.put("internal_properties", this.getInternalProperties());
}
map.put("grant_records_version", this.getGrantRecordsVersion());
map.put("location_without_scheme", this.getLocationWithoutScheme());
if (this.getSchemaVersion() >= 2) {
map.put("location_without_scheme", this.getLocationWithoutScheme());
}
return map;
}

Expand Down Expand Up @@ -315,12 +359,17 @@ public Builder locationWithoutScheme(String location) {
return this;
}

public Builder schemaVersion(int schemaVersion) {
entity.schemaVersion = schemaVersion;
return this;
}

public ModelEntity build() {
return entity;
}
}

public static ModelEntity fromEntity(PolarisBaseEntity entity) {
public static ModelEntity fromEntity(PolarisBaseEntity entity, int schemaVersion) {
var builder =
ModelEntity.builder()
.catalogId(entity.getCatalogId())
Expand Down Expand Up @@ -355,6 +404,8 @@ public static ModelEntity fromEntity(PolarisBaseEntity entity) {
.withoutScheme());
}

builder.schemaVersion(schemaVersion);

return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@
import org.h2.jdbcx.JdbcConnectionPool;
import org.mockito.Mockito;

public class AtomicMetastoreManagerWithJdbcBasePersistenceImplTest
public abstract class AtomicMetastoreManagerWithJdbcBasePersistenceImplTest
extends BasePolarisMetaStoreManagerTest {

public static DataSource createH2DataSource() {
return JdbcConnectionPool.create("jdbc:h2:file:./build/test_data/polaris/db", "sa", "");
public DataSource createH2DataSource() {
return JdbcConnectionPool.create(
String.format("jdbc:h2:file:./build/test_data/polaris/db_%s", schemaVersion()), "sa", "");
}

public abstract int schemaVersion();

@Override
protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() {
int schemaVersion = 2;
PolarisDiagnostics diagServices = new PolarisDefaultDiagServiceImpl();
DatasourceOperations datasourceOperations;
try {
Expand All @@ -52,7 +54,8 @@ protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() {
ClassLoader classLoader = DatasourceOperations.class.getClassLoader();
InputStream scriptStream =
classLoader.getResourceAsStream(
String.format("%s/schema-v%s.sql", DatabaseType.H2.getDisplayName(), schemaVersion));
String.format(
"%s/schema-v%s.sql", DatabaseType.H2.getDisplayName(), schemaVersion()));
datasourceOperations.executeScript(scriptStream);
} catch (SQLException e) {
throw new RuntimeException(
Expand All @@ -69,7 +72,7 @@ protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() {
RANDOM_SECRETS,
Mockito.mock(),
realmContext.getRealmIdentifier(),
schemaVersion);
schemaVersion());
AtomicOperationMetaStoreManager metaStoreManager =
new AtomicOperationMetaStoreManager(clock, diagServices);
PolarisCallContext callCtx = new PolarisCallContext(realmContext, basePersistence);
Expand Down
Loading