Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static java.util.stream.Collectors.toList;

import java.util.AbstractMap.SimpleEntry;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
Expand All @@ -27,6 +28,7 @@
import java.util.function.Predicate;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.hlc.HybridTimestamp;
Expand Down Expand Up @@ -84,52 +86,80 @@ public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, HybridTimes

private List<FullTableSchema> tableSchemaVersionsBetweenCatalogVersions(int tableId, int fromCatalogVersion, int toCatalogVersion) {
return tableVersionsBetween(tableId, fromCatalogVersion, toCatalogVersion)
.map(CatalogValidationSchemasSource::fullSchemaFromTableDescriptor)
.collect(toList());
}

// It's ok to use Stream as the results of the methods that call this are cached.
private Stream<CatalogTableDescriptor> tableVersionsBetween(
int tableId,
int fromCatalogVersionIncluding,
int toCatalogVersionIncluding
) {
return IntStream.rangeClosed(fromCatalogVersionIncluding, toCatalogVersionIncluding)
.mapToObj(catalogVersion -> catalogService.catalog(catalogVersion).table(tableId))
.takeWhile(Objects::nonNull)
.map(entry -> fullSchemaFromCatalog(entry.getKey(), entry.getValue()))
.filter(new Predicate<>() {
int prevVersion = Integer.MIN_VALUE;
FullTableSchema prevSchema = null;

@Override
public boolean test(CatalogTableDescriptor tableDescriptor) {
if (tableDescriptor.latestSchemaVersion() == prevVersion) {
public boolean test(FullTableSchema tableSchema) {
if (prevSchema != null && !tableSchema.hasValidatableChangeFrom(prevSchema)) {
return false;
}

assert prevVersion == Integer.MIN_VALUE || tableDescriptor.latestSchemaVersion() == prevVersion + 1
: String.format("Table version is expected to be prevVersion+1, but version is %d and prevVersion is %d",
tableDescriptor.latestSchemaVersion(), prevVersion);

prevVersion = tableDescriptor.latestSchemaVersion();
prevSchema = tableSchema;

return true;
}
});
})
.collect(toList());
}

private List<FullTableSchema> tableSchemaVersionsBetweenCatalogAndTableVersions(
int tableId,
int fromCatalogVersion,
int toTableVersion
) {
Predicate<CatalogTableDescriptor> tableDescriptorFilter = new Predicate<>() {
int prevVersion = Integer.MIN_VALUE;

@Override
public boolean test(CatalogTableDescriptor table) {
if (table.latestSchemaVersion() == prevVersion) {
return false;
}

assert prevVersion == Integer.MIN_VALUE || table.latestSchemaVersion() == prevVersion + 1
: String.format("Table version is expected to be prevVersion+1, but version is %d and prevVersion is %d",
table.latestSchemaVersion(), prevVersion);

prevVersion = table.latestSchemaVersion();

return true;
}
};

return tableVersionsBetween(tableId, fromCatalogVersion, catalogService.latestCatalogVersion())
.takeWhile(tableDescriptor -> tableDescriptor.latestSchemaVersion() <= toTableVersion)
.map(CatalogValidationSchemasSource::fullSchemaFromTableDescriptor)
.filter(entry -> tableDescriptorFilter.test(entry.getValue()))
.takeWhile(entry -> entry.getValue().latestSchemaVersion() <= toTableVersion)
.map(entry -> fullSchemaFromCatalog(entry.getKey(), entry.getValue()))
.collect(toList());
}

private static FullTableSchema fullSchemaFromTableDescriptor(CatalogTableDescriptor tableDescriptor) {
// It's ok to use Stream as the results of the methods that call this are cached.
private Stream<SimpleEntry<Catalog, CatalogTableDescriptor>> tableVersionsBetween(
int tableId,
int fromCatalogVersionIncluding,
int toCatalogVersionIncluding
) {
return IntStream.rangeClosed(fromCatalogVersionIncluding, toCatalogVersionIncluding)
.mapToObj(ver -> {
Catalog catalog = catalogService.catalog(ver);
CatalogTableDescriptor descriptor = catalog.table(tableId);

if (descriptor == null) {
return null;
}

return new SimpleEntry<>(catalog, descriptor);
})
.takeWhile(Objects::nonNull);
}

private static FullTableSchema fullSchemaFromCatalog(Catalog catalog, CatalogTableDescriptor tableDescriptor) {
assert tableDescriptor != null;

return new FullTableSchema(
catalog.version(),
tableDescriptor.latestSchemaVersion(),
tableDescriptor.id(),
tableDescriptor.name(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
Expand All @@ -36,6 +37,7 @@
* don't affect such compatibility).
*/
public class FullTableSchema {
private final int catalogVersion;
private final int schemaVersion;
private final int tableId;
private final String tableName;
Expand All @@ -45,13 +47,27 @@ public class FullTableSchema {
/**
* Constructor.
*/
public FullTableSchema(int schemaVersion, int tableId, String tableName, List<CatalogTableColumnDescriptor> columns) {
public FullTableSchema(
int catalogVersion,
int schemaVersion,
int tableId,
String tableName,
List<CatalogTableColumnDescriptor> columns
) {
this.catalogVersion = catalogVersion;
this.schemaVersion = schemaVersion;
this.tableId = tableId;
this.tableName = tableName;
this.columns = List.copyOf(columns);
}

/**
* Version of the catalog in which this schema was defined.
*/
public int catalogVersion() {
return catalogVersion;
}

/**
* Returns version of the table definition.
*
Expand Down Expand Up @@ -93,20 +109,30 @@ public List<CatalogTableColumnDescriptor> columns() {
* @return Difference between the schemas.
*/
public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
Map<String, CatalogTableColumnDescriptor> prevColumnsByName = toMapByName(prevSchema.columns, CatalogTableColumnDescriptor::name);
Map<String, CatalogTableColumnDescriptor> thisColumnsByName = toMapByName(this.columns, CatalogTableColumnDescriptor::name);
List<CatalogTableColumnDescriptor> addedColumns = List.of();
List<CatalogTableColumnDescriptor> removedColumns = List.of();
List<ColumnDefinitionDiff> changedColumns = List.of();

if (prevSchema.schemaVersion != schemaVersion) {
Map<String, CatalogTableColumnDescriptor> prevColumnsByName
= toMapByName(prevSchema.columns, CatalogTableColumnDescriptor::name);

Map<String, CatalogTableColumnDescriptor> thisColumnsByName
= toMapByName(this.columns, CatalogTableColumnDescriptor::name);

addedColumns = subtractKeyed(thisColumnsByName, prevColumnsByName);
removedColumns = subtractKeyed(prevColumnsByName, thisColumnsByName);
changedColumns = new ArrayList<>();

List<CatalogTableColumnDescriptor> addedColumns = subtractKeyed(thisColumnsByName, prevColumnsByName);
List<CatalogTableColumnDescriptor> removedColumns = subtractKeyed(prevColumnsByName, thisColumnsByName);
Set<String> intersectionColumnNames = intersect(thisColumnsByName.keySet(), prevColumnsByName.keySet());

Set<String> intersectionColumnNames = intersect(thisColumnsByName.keySet(), prevColumnsByName.keySet());
List<ColumnDefinitionDiff> changedColumns = new ArrayList<>();
for (String commonColumnName : intersectionColumnNames) {
CatalogTableColumnDescriptor prevColumn = prevColumnsByName.get(commonColumnName);
CatalogTableColumnDescriptor thisColumn = thisColumnsByName.get(commonColumnName);
for (String commonColumnName : intersectionColumnNames) {
CatalogTableColumnDescriptor prevColumn = prevColumnsByName.get(commonColumnName);
CatalogTableColumnDescriptor thisColumn = thisColumnsByName.get(commonColumnName);

if (columnChanged(prevColumn, thisColumn)) {
changedColumns.add(new ColumnDefinitionDiff(prevColumn, thisColumn));
if (columnChanged(prevColumn, thisColumn)) {
changedColumns.add(new ColumnDefinitionDiff(prevColumn, thisColumn));
}
}
}

Expand All @@ -121,6 +147,20 @@ public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
);
}

boolean hasValidatableChangeFrom(FullTableSchema prev) {
if (this == prev) {
return false;
}

// TODO: https://issues.apache.org/jira/browse/IGNITE-19484 Remove the following condition.
if (!Objects.equals(tableName, prev.tableName())) {
return true;
}

// Table column related-changes only differ when the schema version is different
return schemaVersion != prev.schemaVersion();
}

private static <T> Map<String, T> toMapByName(List<T> elements, Function<T, String> nameExtractor) {
return elements.stream().collect(toMap(nameExtractor, identity()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;

import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

/**
* Result of a schema compatibility validation.
Expand Down Expand Up @@ -157,6 +158,12 @@ public String details() {
return details;
}

@TestOnly
@Nullable
String optionalDetails() {
return details;
}

/**
* Returns error message corresponding to validation failure. Should only be called for a failed validation result, otherwise an
* assertion error may be thrown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class SchemaCompatibilityValidator {
private final SchemaSyncService schemaSyncService;

// TODO: Remove entries from cache when compacting schemas in SchemaManager https://issues.apache.org/jira/browse/IGNITE-20789
private final ConcurrentMap<TableDefinitionDiffKey, TableDefinitionDiff> diffCache = new ConcurrentHashMap<>();
private final ConcurrentMap<TableDefinitionDiffKey, ValidationResult> forwardDiffToResultCache = new ConcurrentHashMap<>();

private static final List<ForwardCompatibilityValidator> FORWARD_COMPATIBILITY_VALIDATORS = List.of(
new RenameTableValidator(),
Expand Down Expand Up @@ -142,7 +142,10 @@ private CompatValidationResult validateForwardSchemaCompatibility(
FullTableSchema oldSchema = tableSchemas.get(i);
FullTableSchema newSchema = tableSchemas.get(i + 1);

ValidationResult validationResult = validateForwardSchemaCompatibility(oldSchema, newSchema);
ValidationResult validationResult = forwardDiffToResultCache.computeIfAbsent(
new TableDefinitionDiffKey(oldSchema.tableId(), oldSchema.catalogVersion(), newSchema.catalogVersion()),
key -> validateForwardSchemaCompatibility(oldSchema, newSchema)
);

if (validationResult.verdict == ValidatorVerdict.INCOMPATIBLE) {
return CompatValidationResult.incompatibleChange(
Expand All @@ -157,11 +160,8 @@ private CompatValidationResult validateForwardSchemaCompatibility(
return CompatValidationResult.success();
}

private ValidationResult validateForwardSchemaCompatibility(FullTableSchema prevSchema, FullTableSchema nextSchema) {
TableDefinitionDiff diff = diffCache.computeIfAbsent(
new TableDefinitionDiffKey(prevSchema.tableId(), prevSchema.schemaVersion(), nextSchema.schemaVersion()),
key -> nextSchema.diffFrom(prevSchema)
);
private static ValidationResult validateForwardSchemaCompatibility(FullTableSchema prevSchema, FullTableSchema nextSchema) {
TableDefinitionDiff diff = nextSchema.diffFrom(prevSchema);

boolean accepted = false;

Expand All @@ -178,8 +178,8 @@ private ValidationResult validateForwardSchemaCompatibility(FullTableSchema prev
}
}

assert accepted : "Table schema changed from " + prevSchema.schemaVersion()
+ " to " + nextSchema.schemaVersion()
assert accepted : "Table schema changed from " + prevSchema.schemaVersion() + " (catalog version " + prevSchema.catalogVersion()
+ ") to " + nextSchema.schemaVersion() + " (catalog version " + nextSchema.catalogVersion() + ")"
+ ", but no schema change validator voted for any change. Some schema validator is missing.";

return ValidationResult.COMPATIBLE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
*/
class TableDefinitionDiffKey {
private final int tableId;
private final int fromSchemaVersion;
private final int toSchemaVersion;
private final int fromCatalogVersion;
private final int toCatalogVersion;

TableDefinitionDiffKey(int tableId, int fromSchemaVersion, int toSchemaVersion) {
TableDefinitionDiffKey(int tableId, int fromCatalogVersion, int toCatalogVersion) {
this.tableId = tableId;
this.fromSchemaVersion = fromSchemaVersion;
this.toSchemaVersion = toSchemaVersion;
this.fromCatalogVersion = fromCatalogVersion;
this.toCatalogVersion = toCatalogVersion;
}

@Override
Expand All @@ -45,17 +45,17 @@ public boolean equals(Object o) {
if (tableId != that.tableId) {
return false;
}
if (fromSchemaVersion != that.fromSchemaVersion) {
if (fromCatalogVersion != that.fromCatalogVersion) {
return false;
}
return toSchemaVersion == that.toSchemaVersion;
return toCatalogVersion == that.toCatalogVersion;
}

@Override
public int hashCode() {
int result = tableId;
result = 31 * result + fromSchemaVersion;
result = 31 * result + toSchemaVersion;
result = 31 * result + fromCatalogVersion;
result = 31 * result + toCatalogVersion;
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ void addedRemovedColumnsAreReflectedInDiff() {
CatalogTableColumnDescriptor column2 = someColumn("b");
CatalogTableColumnDescriptor column3 = someColumn("c");

var schema1 = new FullTableSchema(1, 1, TABLE_NAME1, List.of(column1, column2));
var schema2 = new FullTableSchema(2, 1, TABLE_NAME1, List.of(column2, column3));
var schema1 = tableSchema(1, 1, TABLE_NAME1, List.of(column1, column2));
var schema2 = tableSchema(2, 1, TABLE_NAME1, List.of(column2, column3));

TableDefinitionDiff diff = schema2.diffFrom(schema1);

Expand All @@ -56,8 +56,8 @@ void addedRemovedColumnsAreReflectedInDiff() {
void changedColumnsAreReflectedInDiff() {
CatalogTableColumnDescriptor column1 = someColumn("a");

var schema1 = new FullTableSchema(1, 1, TABLE_NAME1, List.of(column1));
var schema2 = new FullTableSchema(2, 1, TABLE_NAME1,
var schema1 = tableSchema(1, 1, TABLE_NAME1, List.of(column1));
var schema2 = tableSchema(2, 1, TABLE_NAME1,
List.of(new CatalogTableColumnDescriptor("a", ColumnType.STRING, true, 0, 0, 10, DefaultValue.constant(null)))
);

Expand All @@ -71,11 +71,16 @@ void changedColumnsAreReflectedInDiff() {
void changedNameIsReflected() {
CatalogTableColumnDescriptor column = someColumn("a");

var schema1 = new FullTableSchema(1, 1, TABLE_NAME1, List.of(column));
var schema2 = new FullTableSchema(1, 1, TABLE_NAME2, List.of(column));
var schema1 = tableSchema(1, 1, TABLE_NAME1, List.of(column));
var schema2 = tableSchema(1, 1, TABLE_NAME2, List.of(column));

TableDefinitionDiff diff = schema2.diffFrom(schema1);

assertThat(diff.nameDiffers(), is(true));
}

private static FullTableSchema tableSchema(int schemaVersion, int tableId, String tableName,
List<CatalogTableColumnDescriptor> columns) {
return new FullTableSchema(-1, schemaVersion, tableId, tableName, columns);
}
}
Loading