Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
import org.apache.ignite.internal.partition.replicator.network.command.TimedBinaryRowMessageBuilder;
import org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
import org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
import org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandBase;
import org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandV2Builder;
import org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
import org.apache.ignite.internal.partition.replicator.network.replication.BinaryTupleMessage;
Expand Down Expand Up @@ -2622,48 +2623,22 @@ private CompletableFuture<CommandApplicationResult> applyUpdateCommand(
);

if (!cmd.full()) {
if (skipDelayedAck) {
if (!SKIP_UPDATES) {
storageUpdateHandler.handleUpdate(
cmd.txId(),
cmd.rowUuid(),
cmd.commitPartitionId().asReplicationGroupId(),
cmd.rowToUpdate(),
true,
null,
null,
null,
indexIdsAtRwTxBeginTs(txId)
);
}

return applyCmdWithExceptionHandling(cmd).thenApply(res -> null);
} else {
if (!SKIP_UPDATES) {
// We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
storageUpdateHandler.handleUpdate(
cmd.txId(),
cmd.rowUuid(),
cmd.commitPartitionId().asReplicationGroupId(),
cmd.rowToUpdate(),
true,
null,
null,
null,
indexIdsAtRwTxBeginTs(txId)
);
}

CompletableFuture<UUID> repFut = applyCmdWithExceptionHandling(cmd).handle((r, e) -> {
if (e != null) {
throw new DelayedAckException(cmd.txId(), unwrapCause(e), txManager);
}

return cmd.txId();
});

return completedFuture(new CommandApplicationResult(null, repFut));
if (!SKIP_UPDATES) {
// We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
storageUpdateHandler.handleUpdate(
cmd.txId(),
cmd.rowUuid(),
cmd.commitPartitionId().asReplicationGroupId(),
cmd.rowToUpdate(),
true,
null,
null,
null,
indexIdsAtRwTxBeginTs(txId)
);
}

return applyCmdRespectingDelayedAck(cmd, skipDelayedAck);
} else {
return applyCmdWithExceptionHandling(cmd).thenCompose(res -> {
UpdateCommandResult updateCommandResult = (UpdateCommandResult) res;
Expand Down Expand Up @@ -2737,6 +2712,24 @@ private CompletableFuture<CommandApplicationResult> applyUpdateCommand(
);
}

private CompletableFuture<CommandApplicationResult> applyCmdRespectingDelayedAck(UpdateCommandBase cmd, boolean skipDelayedAck) {
Comment thread
ibessonov marked this conversation as resolved.
assert !cmd.full() : "Only non-full commands are supported here";

if (skipDelayedAck) {
return applyCmdWithExceptionHandling(cmd).thenApply(res -> null);
} else {
CompletableFuture<UUID> repFut = applyCmdWithExceptionHandling(cmd).handle((r, e) -> {
if (e != null) {
throw new DelayedAckException(cmd.txId(), unwrapCause(e), txManager);
}

return cmd.txId();
});

return completedFuture(new CommandApplicationResult(null, repFut));
}
}

/**
* Executes an UpdateAll command.
*
Expand Down Expand Up @@ -2771,41 +2764,18 @@ private CompletableFuture<CommandApplicationResult> applyUpdateAllCommand(
);

if (!cmd.full()) {
if (skipDelayedAck) {
// We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
storageUpdateHandler.handleUpdateAll(
cmd.txId(),
cmd.rowsToUpdate(),
cmd.commitPartitionId().asReplicationGroupId(),
true,
null,
null,
indexIdsAtRwTxBeginTs(txId)
);

return applyCmdWithExceptionHandling(cmd).thenApply(res -> null);
} else {
// We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
storageUpdateHandler.handleUpdateAll(
cmd.txId(),
cmd.rowsToUpdate(),
cmd.commitPartitionId().asReplicationGroupId(),
true,
null,
null,
indexIdsAtRwTxBeginTs(txId)
);
}

CompletableFuture<UUID> repFut = applyCmdWithExceptionHandling(cmd).handle((r, e) -> {
if (e != null) {
throw new DelayedAckException(cmd.txId(), unwrapCause(e), txManager);
}

return cmd.txId();
});

return completedFuture(new CommandApplicationResult(null, repFut));
// We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
storageUpdateHandler.handleUpdateAll(
cmd.txId(),
cmd.rowsToUpdate(),
cmd.commitPartitionId().asReplicationGroupId(),
true,
null,
null,
indexIdsAtRwTxBeginTs(txId)
);

return applyCmdRespectingDelayedAck(cmd, skipDelayedAck);
} else {
return applyCmdWithExceptionHandling(cmd).thenCompose(res -> {
UpdateCommandResult updateCommandResult = (UpdateCommandResult) res;
Expand Down