Skip to content

Commit

Permalink
IGNITE-21801 Add missing Schema Sync integration tests (#3740)
Browse files Browse the repository at this point in the history
  • Loading branch information
rpuch committed May 13, 2024
1 parent db94974 commit 39556b6
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.schemasync;

import static org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowFast;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;

import org.apache.ignite.configuration.ConfigurationChangeException;
import org.apache.ignite.configuration.validation.ConfigurationValidationException;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationConfiguration;
import org.junit.jupiter.api.Test;

@SuppressWarnings("resource")
class ItSchemaSyncConfigTest extends ClusterPerClassIntegrationTest {
@Override
protected int initialNodes() {
return 1;
}

@Test
void delayDurationIsImmutable() {
IgniteImpl ignite = CLUSTER.aliveNode();

SchemaSynchronizationConfiguration config = ignite.clusterConfiguration()
.getConfiguration(SchemaSynchronizationConfiguration.KEY);

ConfigurationChangeException ex = assertWillThrowFast(
config.delayDuration().update(config.delayDuration().value() + 100),
ConfigurationChangeException.class
);

assertThat(ex.getCause(), is(instanceOf(ConfigurationValidationException.class)));
assertThat(
ex.getCause().getMessage(),
containsString("Validation did not pass for keys: [schemaSync.delayDuration, 'schemaSync.delayDuration' "
+ "configuration value is immutable and cannot be updated")
);
}

@Test
void maxClockSkewIsImmutable() {
IgniteImpl ignite = CLUSTER.aliveNode();

SchemaSynchronizationConfiguration config = ignite.clusterConfiguration()
.getConfiguration(SchemaSynchronizationConfiguration.KEY);

ConfigurationChangeException ex = assertWillThrowFast(
config.maxClockSkew().update(config.maxClockSkew().value() + 100),
ConfigurationChangeException.class
);

assertThat(ex.getCause(), is(instanceOf(ConfigurationValidationException.class)));
assertThat(
ex.getCause().getMessage(),
containsString("Validation did not pass for keys: [schemaSync.maxClockSkew, 'schemaSync.maxClockSkew' "
+ "configuration value is immutable and cannot be updated")
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.ignite.internal.schemasync;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -31,15 +33,21 @@
import org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaException;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.Cursor;
import org.apache.ignite.lang.ErrorGroups.Transactions;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;

/**
* Tests about basic Schema Synchronization properties that can be tested using just one Ignite node.
Expand All @@ -52,6 +60,8 @@ class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest {

private static final int KEY = 1;

private static final int NON_EXISTENT_KEY = Integer.MAX_VALUE;

private IgniteImpl node;

@Override
Expand Down Expand Up @@ -132,12 +142,12 @@ private static void putPreExistingValueTo(Table table) {
}

private void enlistTableInTransaction(Table table, Transaction tx) {
executeRwReadOn(table, tx, cluster);
executeRwReadOn(table, tx, NON_EXISTENT_KEY, cluster);
}

private static void executeRwReadOn(Table table, Transaction tx, Cluster cluster) {
private static void executeRwReadOn(Table table, Transaction tx, int key, Cluster cluster) {
cluster.doInSession(0, session -> {
executeUpdate("SELECT * FROM " + table.name(), session, tx);
executeUpdate("SELECT * FROM " + table.name() + " WHERE id = " + key, session, tx);
});
}

Expand Down Expand Up @@ -180,7 +190,7 @@ boolean sql() {
SQL_READ {
@Override
void execute(Table table, Transaction tx, Cluster cluster) {
executeRwReadOn(table, tx, cluster);
executeRwReadOn(table, tx, KEY, cluster);
}

@Override
Expand Down Expand Up @@ -262,4 +272,105 @@ private void dropTable(String tableName) {
executeUpdate("DROP TABLE " + tableName, session);
});
}

@ParameterizedTest
@EnumSource(CommitOperation.class)
void commitAfterDroppingTargetTableIsRejected(CommitOperation operation) {
createTable();

Table table = node.tables().table(TABLE_NAME);

InternalTransaction tx = (InternalTransaction) node.transactions().begin();

enlistTableInTransaction(table, tx);

dropTable(TABLE_NAME);

int tableId = unwrapTableViewInternal(table).tableId();

Throwable ex = assertThrows(Throwable.class, () -> operation.executeOn(tx));
ex = ExceptionUtils.unwrapCause(ex);

assertThat(ex, is(instanceOf(TransactionException.class)));
assertThat(
ex.getMessage(),
containsString(String.format("Commit failed because a table was already dropped [tableId=%s]", tableId))
);

assertThat(((TransactionException) ex).code(), is(Transactions.TX_UNEXPECTED_STATE_ERR));

assertThat(tx.state(), is(TxState.ABORTED));
}

private enum CommitOperation {
COMMIT(tx -> tx.commit()),
COMMIT_ASYNC_GET(tx -> tx.commitAsync().get(10, SECONDS))
;

private final ConsumerX<Transaction> commitOp;

CommitOperation(ConsumerX<Transaction> commitOp) {
this.commitOp = commitOp;
}

void executeOn(Transaction tx) throws Exception {
commitOp.accept(tx);
}
}

/**
* The scenario is like the following.
*
* <ol>
* <li>tx1 starts and enlists table's partition</li>
* <li>table's schema gets changed</li>
* <li>tx2 starts, adds/updates a tuple for key k1 and commits (in this test tx2 is implicit)</li>
* <li>tx1 tries to read k1 via get/scan: this must fail</li>
* </ol>
*/
@ParameterizedTest
@ValueSource(booleans = {false, true})
void readingDataInFutureVersionsFails(boolean scan) {
createTable();

Table table = node.tables().table(TABLE_NAME);
KeyValueView<Tuple, Tuple> kvView = table.keyValueView();

InternalTransaction tx1 = (InternalTransaction) node.transactions().begin();

enlistTableInTransaction(table, tx1);

alterTable(TABLE_NAME);

Tuple keyTuple = Tuple.create().set("id", KEY);
kvView.put(null, keyTuple, Tuple.create().set("val", "put-in-tx2"));

int tableId = unwrapTableViewInternal(table).tableId();

Executable task = scan ? () -> consumeCursor(kvView.query(tx1, null)) : () -> kvView.get(tx1, keyTuple);

IgniteException ex = assertThrows(IgniteException.class, task);

assertThat(
ex.getMessage(),
containsString(String.format(
"Operation failed because it tried to access a row with newer schema version than transaction's [table=%s, "
+ "txSchemaVersion=1, rowSchemaVersion=2]",
tableId
))
);

assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR));
}

private static void consumeCursor(Cursor<?> cursor) {
try (Cursor<?> c = cursor) {
c.forEachRemaining(obj -> {});
}
}

@FunctionalInterface
private interface ConsumerX<T> {
void accept(T obj) throws Exception;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1013,22 +1013,27 @@ private CompletableFuture<List<BinaryRow>> retrieveExactEntriesUntilCursorEmpty(
for (int i = 0; i < rows.size(); i++) {
BinaryRow row = rows.get(i);

futs[i] = schemaCompatValidator.validateBackwards(row.schemaVersion(), tableId(), txId)
.thenCompose(validationResult -> {
if (validationResult.isSuccessful()) {
return completedFuture(row);
} else {
throw new IncompatibleSchemaException("Operation failed because schema "
+ validationResult.fromSchemaVersion() + " is not backward-compatible with "
+ validationResult.toSchemaVersion() + " for table " + validationResult.failedTableId());
}
});
futs[i] = validateBackwardCompatibility(row, txId)
.thenApply(unused -> row);
}

return allOf(futs).thenApply((unused) -> rows);
});
}

private CompletableFuture<Void> validateBackwardCompatibility(BinaryRow row, UUID txId) {
return schemaCompatValidator.validateBackwards(row.schemaVersion(), tableId(), txId)
.thenAccept(validationResult -> {
if (!validationResult.isSuccessful()) {
throw new IncompatibleSchemaException(String.format(
"Operation failed because it tried to access a row with newer schema version than transaction's [table=%d, "
+ "txSchemaVersion=%d, rowSchemaVersion=%d]",
validationResult.failedTableId(), validationResult.fromSchemaVersion(), validationResult.toSchemaVersion()
));
}
});
}

/**
* Processes single entry request for read only transaction.
*
Expand Down Expand Up @@ -1569,16 +1574,8 @@ private CompletableFuture<Void> continueIndexLookup(
return nullCompletedFuture();
}

return schemaCompatValidator.validateBackwards(row.binaryRow().schemaVersion(), tableId(), txId)
.thenApply(validationResult -> {
if (validationResult.isSuccessful()) {
return row;
} else {
throw new IncompatibleSchemaException("Operation failed because schema "
+ validationResult.fromSchemaVersion() + " is not backward-compatible with "
+ validationResult.toSchemaVersion() + " for table " + validationResult.failedTableId());
}
});
return validateBackwardCompatibility(row.binaryRow(), txId)
.thenApply(unused -> row);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1873,7 +1873,10 @@ private static void assertFailureDueToBackwardIncompatibleSchemaChange(
IncompatibleSchemaException ex = assertWillThrowFast(future,
IncompatibleSchemaException.class);
assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR));
assertThat(ex.getMessage(), containsString("Operation failed because schema 1 is not backward-compatible with 2"));
assertThat(ex.getMessage(), containsString(
"Operation failed because it tried to access a row with newer schema version than transaction's [table=1, "
+ "txSchemaVersion=1, rowSchemaVersion=2]"
));

// Tx should not be finished.
assertThat(committed.get(), is(nullValue()));
Expand Down

0 comments on commit 39556b6

Please sign in to comment.