From a966cce2f410789cf35fdfa8c4b966f13a7b2f06 Mon Sep 17 00:00:00 2001 From: Roman Puchkovskiy Date: Tue, 14 Apr 2026 11:05:43 +0400 Subject: [PATCH 1/2] IGNITE-28537 Remove duplicates from PartitionReplicaListener --- .../replicator/PartitionReplicaListener.java | 120 +++++++----------- 1 file changed, 44 insertions(+), 76 deletions(-) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 0f816b845b0..be73d06118a 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -122,6 +122,7 @@ import org.apache.ignite.internal.partition.replicator.network.command.TimedBinaryRowMessageBuilder; import org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand; import org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand; +import org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandBase; import org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandV2Builder; import org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage; import org.apache.ignite.internal.partition.replicator.network.replication.BinaryTupleMessage; @@ -2622,48 +2623,22 @@ private CompletableFuture applyUpdateCommand( ); if (!cmd.full()) { - if (skipDelayedAck) { - if (!SKIP_UPDATES) { - storageUpdateHandler.handleUpdate( - cmd.txId(), - cmd.rowUuid(), - cmd.commitPartitionId().asReplicationGroupId(), - cmd.rowToUpdate(), - true, - null, - null, - null, - indexIdsAtRwTxBeginTs(txId) - ); - } - - return applyCmdWithExceptionHandling(cmd).thenApply(res -> null); - } else { - if (!SKIP_UPDATES) { - // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why. - storageUpdateHandler.handleUpdate( - cmd.txId(), - cmd.rowUuid(), - cmd.commitPartitionId().asReplicationGroupId(), - cmd.rowToUpdate(), - true, - null, - null, - null, - indexIdsAtRwTxBeginTs(txId) - ); - } - - CompletableFuture repFut = applyCmdWithExceptionHandling(cmd).handle((r, e) -> { - if (e != null) { - throw new DelayedAckException(cmd.txId(), unwrapCause(e), txManager); - } - - return cmd.txId(); - }); - - return completedFuture(new CommandApplicationResult(null, repFut)); + if (!SKIP_UPDATES) { + // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why. + storageUpdateHandler.handleUpdate( + cmd.txId(), + cmd.rowUuid(), + cmd.commitPartitionId().asReplicationGroupId(), + cmd.rowToUpdate(), + true, + null, + null, + null, + indexIdsAtRwTxBeginTs(txId) + ); } + + return applyCmdRespectingDelayedAck(cmd, skipDelayedAck); } else { return applyCmdWithExceptionHandling(cmd).thenCompose(res -> { UpdateCommandResult updateCommandResult = (UpdateCommandResult) res; @@ -2737,6 +2712,22 @@ private CompletableFuture applyUpdateCommand( ); } + private CompletableFuture applyCmdRespectingDelayedAck(UpdateCommandBase cmd, boolean skipDelayedAck) { + if (skipDelayedAck) { + return applyCmdWithExceptionHandling(cmd).thenApply(res -> null); + } else { + CompletableFuture repFut = applyCmdWithExceptionHandling(cmd).handle((r, e) -> { + if (e != null) { + throw new DelayedAckException(cmd.txId(), unwrapCause(e), txManager); + } + + return cmd.txId(); + }); + + return completedFuture(new CommandApplicationResult(null, repFut)); + } + } + /** * Executes an UpdateAll command. * @@ -2771,41 +2762,18 @@ private CompletableFuture applyUpdateAllCommand( ); if (!cmd.full()) { - if (skipDelayedAck) { - // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why. - storageUpdateHandler.handleUpdateAll( - cmd.txId(), - cmd.rowsToUpdate(), - cmd.commitPartitionId().asReplicationGroupId(), - true, - null, - null, - indexIdsAtRwTxBeginTs(txId) - ); - - return applyCmdWithExceptionHandling(cmd).thenApply(res -> null); - } else { - // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why. - storageUpdateHandler.handleUpdateAll( - cmd.txId(), - cmd.rowsToUpdate(), - cmd.commitPartitionId().asReplicationGroupId(), - true, - null, - null, - indexIdsAtRwTxBeginTs(txId) - ); - } - - CompletableFuture repFut = applyCmdWithExceptionHandling(cmd).handle((r, e) -> { - if (e != null) { - throw new DelayedAckException(cmd.txId(), unwrapCause(e), txManager); - } - - return cmd.txId(); - }); - - return completedFuture(new CommandApplicationResult(null, repFut)); + // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why. + storageUpdateHandler.handleUpdateAll( + cmd.txId(), + cmd.rowsToUpdate(), + cmd.commitPartitionId().asReplicationGroupId(), + true, + null, + null, + indexIdsAtRwTxBeginTs(txId) + ); + + return applyCmdRespectingDelayedAck(cmd, skipDelayedAck); } else { return applyCmdWithExceptionHandling(cmd).thenCompose(res -> { UpdateCommandResult updateCommandResult = (UpdateCommandResult) res; From 150ffc64152448881fdbb5b4de6e4fbc27f8cd33 Mon Sep 17 00:00:00 2001 From: Roman Puchkovskiy Date: Tue, 14 Apr 2026 11:56:58 +0400 Subject: [PATCH 2/2] IGNITE-28537 / add an assertion --- .../table/distributed/replicator/PartitionReplicaListener.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index be73d06118a..f173d9ad228 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -2713,6 +2713,8 @@ private CompletableFuture applyUpdateCommand( } private CompletableFuture applyCmdRespectingDelayedAck(UpdateCommandBase cmd, boolean skipDelayedAck) { + assert !cmd.full() : "Only non-full commands are supported here"; + if (skipDelayedAck) { return applyCmdWithExceptionHandling(cmd).thenApply(res -> null); } else {