Skip to content

Commit

Permalink
Added an additional message to wait for the primary replica to be rea…
Browse files Browse the repository at this point in the history
…dy before returning the primary replica for the placement driver API.
  • Loading branch information
vldpyatkov committed May 13, 2024
1 parent c663cb2 commit 5a5330a
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.internal.util.IgniteUtils.retryOperationUntilSuccess;

Expand Down Expand Up @@ -63,6 +64,9 @@ public class Replica {

private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();

/** The replica result is used when we have nothing to return. */
public static final ReplicaResult EMPTY_REPLICA_RESULT = new ReplicaResult(null, null);

/** Replica group identity, this id is the same as the considered partition's id. */
private final ReplicationGroupId replicaGrpId;

Expand Down Expand Up @@ -157,17 +161,12 @@ public CompletableFuture<ReplicaResult> processRequest(ReplicaRequest request, S

if (request instanceof WaitReplicaStateMessage) {
if (!waitForActualStateFuture.isDone()) {
return processWaitReplicaStateMessage((WaitReplicaStateMessage) request)
.thenComposeAsync(
v -> sendPrimaryReplicaChangeToReplicationGroup(targetPrimaryReq.enlistmentConsistencyToken()),
executor
)
.thenComposeAsync(
unused -> completedFuture(new ReplicaResult(null, null)),
executor
);
var waitReplicaStateMsg = (WaitReplicaStateMessage) request;

return processWaitReplicaStateMessage(waitReplicaStateMsg)
.thenApply(unused -> EMPTY_REPLICA_RESULT);
} else {
return completedFuture(new ReplicaResult(null, null));
return completedFuture(EMPTY_REPLICA_RESULT);
}
}

Expand Down Expand Up @@ -295,7 +294,25 @@ private CompletableFuture<LeaseGrantedMessageResponse> processLeaseGrantedMessag
private CompletableFuture<Void> processWaitReplicaStateMessage(WaitReplicaStateMessage msg) {
LOG.info("WaitReplicaStateMessage was received [groupId = {}]", groupId());

return waitForActualState(FastTimestamps.coarseCurrentTimeMillis() + TimeUnit.SECONDS.toMillis(msg.timeout()));
CompletableFuture<Void> result;

if (msg.updateLease()) {
result = placementDriver.addSubgroups(
zonePartitionId,
msg.enlistmentConsistencyToken(),
Set.of(replicaGrpId)
);
} else {
result = nullCompletedFuture();
}

return result
// TODO: https://issues.apache.org/jira/browse/IGNITE-22122
.thenComposeAsync(unused -> waitForActualState(FastTimestamps.coarseCurrentTimeMillis() + msg.timeout()), executor)
.thenComposeAsync(
v -> sendPrimaryReplicaChangeToReplicationGroup(msg.enlistmentConsistencyToken()),
executor
);
}

private CompletableFuture<Void> sendPrimaryReplicaChangeToReplicationGroup(long leaseStartTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

package org.apache.ignite.internal.replicator;

import static java.util.concurrent.CompletableFuture.completedFuture;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.event.EventListener;
Expand All @@ -29,6 +33,7 @@
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.WaitReplicaStateMessage;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterNodeResolver;

/**
Expand All @@ -49,6 +54,9 @@ public class ReplicaAwareLeaseTracker extends AbstractEventProducer<PrimaryRepli
/** Resolver that resolves a node consistent ID to cluster node. */
private final ClusterNodeResolver clusterNodeResolver;

/** Map for saving waiting for replica state futures for each table partition. */
private final Map<TablePartitionId, CompletableFuture<Void>> waitPrimaryState = new ConcurrentHashMap<>();


/**
* Constructor.
Expand Down Expand Up @@ -91,8 +99,31 @@ public CompletableFuture<ReplicaMeta> awaitPrimaryReplicaForTable(
assert zonePartitionId.tableId() != 0 : "Table id should be defined.";

ZonePartitionId pureZonePartId = zonePartitionId.purify();
TablePartitionId tablePartitionId = new TablePartitionId(zonePartitionId.tableId(), zonePartitionId.partitionId());

return delegate.awaitPrimaryReplicaForTable(pureZonePartId, timestamp, timeout, unit)
.thenCompose(replicaMeta -> {
ClusterNode leaseholderNode = clusterNodeResolver.getById(replicaMeta.getLeaseholderId());

if (replicaMeta.subgroups().contains(tablePartitionId)) {
waitPrimaryState.remove(tablePartitionId);

return completedFuture(replicaMeta);
}

CompletableFuture<Void> waitReplicaStateFut = waitPrimaryState.computeIfAbsent(tablePartitionId, gerpId -> {
WaitReplicaStateMessage awaitReplicaReq = REPLICA_MESSAGES_FACTORY.waitReplicaStateMessage()
.groupId(tablePartitionId)
.enlistmentConsistencyToken(replicaMeta.getStartTime().longValue())
.timeout(unit.toMillis(timeout))
.updateLease(true)
.build();

return replicaService.invoke(leaseholderNode, awaitReplicaReq);
});

return delegate.awaitPrimaryReplicaForTable(pureZonePartId, timestamp, timeout, unit);
return waitReplicaStateFut.thenApply((ignored) -> replicaMeta);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,8 @@ private void updateTableGroupsInternal() {
.enlistmentConsistencyToken(meta.getStartTime().longValue())
.groupId(partId)
// TODO: https://issues.apache.org/jira/browse/IGNITE-22122
.timeout(10)
.timeout(10_000)
.updateLease(false)
.build();

CompletableFuture<Replica> replicaFut = replicas.get(repGrp);
Expand All @@ -739,7 +740,7 @@ private void updateTableGroupsInternal() {

return allOf(requestToReplicas.toArray(CompletableFuture[]::new));
}, scheduledTableLeaseUpdateExecutor)
.get(500, TimeUnit.MILLISECONDS);
.get(1, TimeUnit.SECONDS);
} catch (Exception ex) {
LOG.error(
"Failed to add new subgroups to the replication group [repGrp={}, subGroups={}].",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@
@Transferable(ReplicaMessageGroup.WAIT_REPLICA_STATE)
public interface WaitReplicaStateMessage extends PrimaryReplicaRequest {
long timeout();

boolean updateLease();
}

0 comments on commit 5a5330a

Please sign in to comment.