Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-20457 Verify commitTimestamp against enlisted partitions expiration timestamps #2658

Merged
merged 28 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1db9bb5
IGNITE-20427 Basic implementation.
Oct 3, 2023
6facb42
IGNITE-20427 TxManager creation moved to an a proper place
Oct 4, 2023
6f8d367
IGNITE-20427 Compilation issues
Oct 4, 2023
78620b7
IGNITE-20427 Incorrect cast updated
Oct 4, 2023
2dd5434
IGNITE-20427 Minors fixed.
Oct 4, 2023
b650ba0
IGNITE-20427 Concurrency issue fixed.
Oct 4, 2023
c67f6b2
IGNITE-20427 Checkstyle issues
Oct 4, 2023
ce22ef4
IGNITE-20427 Move update observable time stamp under if(commit) clause
Oct 4, 2023
1878cc1
IGNITE-20427 Tests added.
Oct 6, 2023
35d2689
IGNITE-20427 Cleanup.
sanpwc Oct 6, 2023
2b809ef
IGNITE-20427 Unused import removed.
sanpwc Oct 6, 2023
4d2ad0e
IGNITE-20427 Unused import removed.
sanpwc Oct 6, 2023
bf5ece8
IGNITE-20427 More checkstyle.
sanpwc Oct 6, 2023
384247b
Merge branch 'main' of https://github.com/apache/ignite-3 into ignite…
sanpwc Oct 6, 2023
b08e182
IGNITE-20427 PMD and checkstyle
sanpwc Oct 6, 2023
a18c3a5
Merge branch 'main' of https://github.com/apache/ignite-3 into ignite…
sanpwc Oct 10, 2023
a2c9dfa
IGNITE-20457 Checkstyle
sanpwc Oct 10, 2023
f85f872
IGNITE-20457 Changes after code review
sanpwc Oct 11, 2023
b35258e
IGNITE-20457 Changes after code review
sanpwc Oct 11, 2023
94def68
IGNITE-20457 Checkstyle
sanpwc Oct 11, 2023
403c598
Merge branch 'main' of https://github.com/apache/ignite-3 into ignite…
sanpwc Oct 12, 2023
cb70112
IGNITE-20457 Changes after code review
sanpwc Oct 12, 2023
7fb32e8
Merge branch 'main' of https://github.com/apache/ignite-3 into ignite…
sanpwc Oct 12, 2023
33df07e
IGNITE-20457 Changes after code review + checkstyle
sanpwc Oct 13, 2023
fb07026
IGNITE-20457 Changes after code review
sanpwc Oct 13, 2023
a808e0a
IGNITE-20457 Changes after code review
sanpwc Oct 13, 2023
4e3295a
IGNITE-20457 Revert CLOCK_SKEW addition to timestamp in awaitPrimaryR…
sanpwc Oct 13, 2023
35380a2
IGNITE-20457 thenCompose(Function.Identity()) restored
sanpwc Oct 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ public static class Transactions {

/** Failure due to an abandoned transaction. */
public static final int TX_ABANDONED_ERR = TX_ERR_GROUP.registerErrorCode((short) 13);

/** Failure due to primary replica expiration. */
public static final int TX_PRIMARY_REPLICA_EXPIRED_ERR = TX_ERR_GROUP.registerErrorCode((short) 14);
}

/** Replicator error group. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static java.util.concurrent.CompletableFuture.completedFuture;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -170,7 +169,7 @@ public CompletableFuture<Void> finish(
ClusterNode recipientNode,
Long term,
boolean commit,
Map<ClusterNode, List<IgniteBiTuple<TablePartitionId, Long>>> groups,
Map<TablePartitionId, Long> enlistedGroups,
UUID txId
) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,22 @@
/**
* Service that provides an ability to await and retrieve primary replicas for replication groups.
*/
// TODO: https://issues.apache.org/jira/browse/IGNITE-20646 Consider using CLOCK_SKEW unaware await/getPrimaryReplica()
public interface PlacementDriver extends EventProducer<PrimaryReplicaEvent, PrimaryReplicaEventParameters> {
/**
* Returns a future for the primary replica for the specified replication group whose expiration time (the right border of the
* corresponding lease interval) is greater than or equal to the timestamp passed as a parameter. Please pay attention that there are
* no restriction on the lease start time (left border), it can either be less or greater than or equal to proposed timestamp.
* corresponding lease interval) is greater than or equal to the (timestamp passed as a parameter - CLOCK_SKEW).
* Please pay attention that there are no restriction on the lease start time (left border),
* it can either be less or greater than or equal to proposed timestamp.
* Given method will await for an appropriate primary replica appearance if there's no already existing one.
*
* @param groupId Replication group id.
* @param timestamp Timestamp reference value.
* @param timestamp CLOCK_SKEW aware timestamp reference value.
* @param timeout How long to wait before completing exceptionally with a TimeoutException, in units of unit.
* @param unit A TimeUnit determining how to interpret the timeout parameter.
* @return Primary replica future.
* @throws PrimaryReplicaAwaitTimeoutException If primary replica await timed out.
* @throws PrimaryReplicaAwaitException If primary replica await failed with any other reason except timeout.
*/
CompletableFuture<ReplicaMeta> awaitPrimaryReplica(
ReplicationGroupId groupId,
Expand All @@ -54,7 +58,7 @@ CompletableFuture<ReplicaMeta> awaitPrimaryReplica(
* lease isn't found. Generally speaking reasonable here means enough for distribution across cluster nodes.
*
* @param replicationGroupId Replication group id.
* @param timestamp Timestamp reference value.
* @param timestamp CLOCK_SKEW aware timestamp reference value.
* @return Primary replica future.
*/
CompletableFuture<ReplicaMeta> getPrimaryReplica(ReplicationGroupId replicationGroupId, HybridTimestamp timestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,22 +235,20 @@ public CompletableFuture<ReplicaMeta> awaitPrimaryReplica(

@Override
public CompletableFuture<ReplicaMeta> getPrimaryReplica(ReplicationGroupId replicationGroupId, HybridTimestamp timestamp) {
HybridTimestamp timestampWithClockSkew = timestamp.addPhysicalTime(CLOCK_SKEW);

return inBusyLockAsync(busyLock, () -> {
Lease lease = leases.leaseByGroupId().getOrDefault(replicationGroupId, EMPTY_LEASE);

if (lease.isAccepted() && lease.getExpirationTime().after(timestampWithClockSkew)) {
if (lease.isAccepted() && lease.getExpirationTime().after(timestamp)) {
return completedFuture(lease);
}

return msManager
.clusterTime()
.waitFor(timestampWithClockSkew)
.waitFor(timestamp.addPhysicalTime(CLOCK_SKEW))
.thenApply(ignored -> inBusyLock(busyLock, () -> {
Lease lease0 = leases.leaseByGroupId().getOrDefault(replicationGroupId, EMPTY_LEASE);

if (lease0.isAccepted() && lease0.getExpirationTime().after(timestampWithClockSkew)) {
if (lease0.isAccepted() && lease0.getExpirationTime().after(timestamp)) {
return lease0;
} else {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,21 @@

import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.hlc.HybridTimestamp.CLOCK_SKEW;
import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY;
import static org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -374,6 +377,39 @@ public void testGetPrimaryReplica() throws Exception {
assertEquals(LEASE_FROM_1_TO_15_000.getExpirationTime(), retrievedPrimaryReplicaTimeLtLeaseExpiration.get().getExpirationTime());
}

/**
* Test steps.
* <ol>
* <li>Await primary replica for time 10.</li>
* <li>Publish primary replica for an interval [1, 15].</li>
* <li>Assert that primary await future will succeed fast.</li>
* <li>Assert that retrieved primary replica for timestamp less than primaryReplica.expirationTimestamp - CLOCK_SKEW
* will return null./li>
* </ol>
*/
@Test
public void testGetPrimaryReplicaWithLessThanClockSkewDiff() {
// Await primary replica for time 10.
CompletableFuture<ReplicaMeta> primaryReplicaFuture = placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
assertFalse(primaryReplicaFuture.isDone());

// Publish primary replica for an interval [1, 15].
publishLease(LEASE_FROM_1_TO_15_000);

// Assert that primary await future will succeed fast.
assertThat(primaryReplicaFuture, willSucceedFast());

// Assert that retrieved primary replica for timestamp less than primaryReplica.expirationTimestamp - CLOCK_SKEW will return null.
assertThat(
placementDriver.getPrimaryReplica(
GROUP_1,
LEASE_FROM_1_TO_15_000.getExpirationTime().addPhysicalTime(-CLOCK_SKEW).addPhysicalTime(1L)
),
willBe(nullValue())
);
}

@Test
void testListenReplicaBecomePrimaryEventCaseNoLeaseBefore() {
CompletableFuture<PrimaryReplicaEventParameters> eventParametersFuture = listenAnyReplicaBecomePrimaryEvent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -835,8 +835,14 @@ private class Node {
hybridClock
);

txManager = new TxManagerImpl(replicaSvc, lockManager, hybridClock, new TransactionIdGenerator(addr.port()),
() -> clusterService.topologyService().localMember().id());
txManager = new TxManagerImpl(
replicaSvc,
lockManager,
hybridClock,
new TransactionIdGenerator(addr.port()),
() -> clusterService.topologyService().localMember().id(),
placementDriver
);

String nodeName = clusterService.nodeName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,14 @@ private PartialNode startPartialNode(

ReplicaService replicaSvc = new ReplicaService(clusterSvc.messagingService(), hybridClock);

var txManager = new TxManagerImpl(replicaService, lockManager, hybridClock, new TransactionIdGenerator(idx),
() -> clusterSvc.topologyService().localMember().id());
var txManager = new TxManagerImpl(
replicaService,
lockManager,
hybridClock,
new TransactionIdGenerator(idx),
() -> clusterSvc.topologyService().localMember().id(),
placementDriver
);

var logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,15 +389,6 @@ public class IgniteImpl implements Ignite {

ReplicaService replicaSvc = new ReplicaService(clusterSvc.messagingService(), clock);

// TODO: IGNITE-19344 - use nodeId that is validated on join (and probably generated differently).
txManager = new TxManagerImpl(
replicaSvc,
lockMgr,
clock,
new TransactionIdGenerator(() -> clusterSvc.nodeName().hashCode()),
() -> clusterSvc.topologyService().localMember().id()
);

// TODO: IGNITE-16841 - use common RocksDB instance to store cluster state as well.
clusterStateStorage = new RocksDbClusterStateStorage(workDir.resolve(CMG_DB_PATH));

Expand Down Expand Up @@ -544,6 +535,16 @@ public class IgniteImpl implements Ignite {
catalogManager
);

// TODO: IGNITE-19344 - use nodeId that is validated on join (and probably generated differently).
txManager = new TxManagerImpl(
replicaSvc,
lockMgr,
clock,
new TransactionIdGenerator(() -> clusterSvc.nodeName().hashCode()),
() -> clusterSvc.topologyService().localMember().id(),
placementDriverMgr.placementDriver()
);

distributedTblMgr = new TableManager(
name,
registry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.BinaryRow;
Expand Down Expand Up @@ -114,8 +115,14 @@ public void testScanNodeDataPropagation() throws InterruptedException {

ReplicaService replicaSvc = mock(ReplicaService.class, RETURNS_DEEP_STUBS);

TxManagerImpl txManager = new TxManagerImpl(replicaSvc, new HeapLockManager(), new HybridClockImpl(),
new TransactionIdGenerator(0xdeadbeef), () -> "local");
TxManagerImpl txManager = new TxManagerImpl(
replicaSvc,
new HeapLockManager(),
new HybridClockImpl(),
new TransactionIdGenerator(0xdeadbeef),
() -> "local",
new TestPlacementDriver("local")
);

txManager.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@
public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<PartitionListener> {
private static final String NODE_NAME = "node1";

private static final TestPlacementDriver TEST_PLACEMENT_DRIVER = new TestPlacementDriver(NODE_NAME);

/** Factory to create RAFT command messages. */
private final TableMessagesFactory msgFactory = new TableMessagesFactory();

Expand Down Expand Up @@ -202,8 +204,14 @@ public void beforeFollowerStop(RaftGroupService service, RaftServer server) thro

for (int i = 0; i < nodes(); i++) {
if (!txManagers.containsKey(i)) {
TxManager txManager = new TxManagerImpl(replicaService, new HeapLockManager(), hybridClock, new TransactionIdGenerator(i),
() -> "local");
TxManager txManager = new TxManagerImpl(
replicaService,
new HeapLockManager(),
hybridClock,
new TransactionIdGenerator(i),
() -> NODE_NAME,
TEST_PLACEMENT_DRIVER
);

txManager.start();

Expand All @@ -212,8 +220,14 @@ public void beforeFollowerStop(RaftGroupService service, RaftServer server) thro
}
}

TxManager txManager = new TxManagerImpl(replicaService, new HeapLockManager(), hybridClock, new TransactionIdGenerator(-1),
() -> "local");
TxManager txManager = new TxManagerImpl(
replicaService,
new HeapLockManager(),
hybridClock,
new TransactionIdGenerator(-1),
() -> NODE_NAME,
TEST_PLACEMENT_DRIVER
);

txManager.start();

Expand All @@ -231,7 +245,7 @@ public void beforeFollowerStop(RaftGroupService service, RaftServer server) thro
replicaService,
hybridClock,
new HybridTimestampTracker(),
new TestPlacementDriver(NODE_NAME)
TEST_PLACEMENT_DRIVER
);

closeables.add(() -> table.close());
Expand Down Expand Up @@ -456,8 +470,14 @@ public RaftGroupListener createListener(ClusterService service, Path path, int i
);

TxManager txManager = txManagers.computeIfAbsent(index, k -> {
TxManager txMgr = new TxManagerImpl(replicaService, new HeapLockManager(), hybridClock,
new TransactionIdGenerator(index), () -> "local");
TxManager txMgr = new TxManagerImpl(
replicaService,
new HeapLockManager(),
hybridClock,
new TransactionIdGenerator(index),
() -> NODE_NAME,
TEST_PLACEMENT_DRIVER
);
txMgr.start();
closeables.add(txMgr::stop);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,21 @@ public void before() throws Exception {
timestampTracker
) {
@Override
protected TxManagerImpl newTxManager(ReplicaService replicaSvc, HybridClock clock, TransactionIdGenerator generator,
ClusterNode node) {
return new TxManagerImpl(replicaSvc, new HeapLockManager(), clock, generator, node::id) {
protected TxManagerImpl newTxManager(
ReplicaService replicaSvc,
HybridClock clock,
TransactionIdGenerator generator,
ClusterNode node,
PlacementDriver placementDriver
) {
return new TxManagerImpl(
replicaSvc,
new HeapLockManager(),
clock,
generator,
node::id,
placementDriver
) {
@Override
public CompletableFuture<Void> executeCleanupAsync(Runnable runnable) {
if (ignoreAsyncCleanup) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.service.RaftGroupService;
Expand Down Expand Up @@ -147,7 +146,8 @@ static void beforeAllTests() {
new HeapLockManager(),
new HybridClockImpl(),
new TransactionIdGenerator(0xdeadbeef),
clusterNode::id
clusterNode::id,
new TestPlacementDriver(clusterNode.name())
) {
@Override
public CompletableFuture<Void> finish(
Expand All @@ -156,8 +156,9 @@ public CompletableFuture<Void> finish(
ClusterNode recipientNode,
Long term,
boolean commit,
Map<ClusterNode, List<IgniteBiTuple<TablePartitionId, Long>>> groups,
UUID txId) {
Map<TablePartitionId, Long> enlistedGroups,
UUID txId
) {
return completedFuture(null);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1183,23 +1183,21 @@ private CompletableFuture<Void> continueIndexLookup(
*/
// TODO: need to properly handle primary replica changes https://issues.apache.org/jira/browse/IGNITE-17615
private CompletableFuture<Void> processTxFinishAction(TxFinishReplicaRequest request, String txCoordinatorId) {
List<TablePartitionId> aggregatedGroupIds = request.groups().values().stream()
.flatMap(List::stream)
.map(IgniteBiTuple::get1)
.collect(toList());
// TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use ZonePartitionIdMessage and remove cast
Collection<TablePartitionId> enlistedGroups = (Collection<TablePartitionId>) (Collection<?>) request.groups();

UUID txId = request.txId();

if (request.commit()) {
HybridTimestamp commitTimestamp = request.commitTimestamp();

return schemaCompatValidator.validateForward(txId, aggregatedGroupIds, commitTimestamp)
return schemaCompatValidator.validateForward(txId, enlistedGroups, commitTimestamp)
.thenCompose(validationResult ->
finishAndCleanup(aggregatedGroupIds, validationResult.isSuccessful(), commitTimestamp, txId, txCoordinatorId)
finishAndCleanup(enlistedGroups, validationResult.isSuccessful(), commitTimestamp, txId, txCoordinatorId)
.thenAccept(unused -> throwIfSchemaValidationOnCommitFailed(validationResult)));
} else {
// Aborting.
return finishAndCleanup(aggregatedGroupIds, false, null, txId, txCoordinatorId);
return finishAndCleanup(enlistedGroups, false, null, txId, txCoordinatorId);
}
}

Expand Down
Loading