From 4f9556ca8d9c8c136be721de778e7a277c7d7b59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Galder=20Zamarren=CC=83o?= Date: Fri, 23 Oct 2015 13:48:49 +0200 Subject: [PATCH] ISPN-5876 Invalidate keys on commit if 2-phase * Invalidate keys on commit if transaction is 2-phase, otherwise invalidate keys on prepare. --- .../interceptors/InvalidationInterceptor.java | 45 ++++++++++++++----- .../java/org/infinispan/util/logging/Log.java | 3 ++ 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java b/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java index 8eda47df59a3..4baca53c013b 100644 --- a/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java @@ -34,6 +34,7 @@ import org.infinispan.commands.CommandsFactory; import org.infinispan.commands.FlagAffectedCommand; import org.infinispan.commands.control.LockControlCommand; +import org.infinispan.commands.tx.CommitCommand; import org.infinispan.commands.tx.PrepareCommand; import org.infinispan.commands.write.ClearCommand; import org.infinispan.commands.write.InvalidateCommand; @@ -134,18 +135,39 @@ public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) t @Override public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable { - Object retval = invokeNextInterceptor(ctx, command); - log.tracef("Entering InvalidationInterceptor's prepare phase. Ctx flags are empty"); - // fetch the modifications before the transaction is committed (and thus removed from the txTable) - if (shouldInvokeRemoteTxCommand(ctx)) { - List mods = Arrays.asList(command.getModifications()); - Transaction runningTransaction = ctx.getTransaction(); - if (runningTransaction == null) throw new IllegalStateException("we must have an associated transaction"); - broadcastInvalidateForPrepare(mods, runningTransaction, ctx); - } else { - log.tracef("Nothing to invalidate - no modifications in the transaction."); + if (command.isOnePhaseCommit()) { + Object retval = invokeNextInterceptor(ctx, command); + log.tracef("Entering InvalidationInterceptor's prepare phase. Ctx flags are empty"); + // fetch the modifications before the transaction is committed (and thus removed from the txTable) + if (shouldInvokeRemoteTxCommand(ctx)) { + List mods = Arrays.asList(command.getModifications()); + Transaction runningTransaction = ctx.getTransaction(); + if (runningTransaction == null) + throw new IllegalStateException("we must have an associated transaction"); + broadcastInvalidateForPrepare(mods, runningTransaction, ctx); + } else { + log.tracef("Nothing to invalidate - no modifications in the transaction."); + } + return retval; + } + + return super.visitPrepareCommand(ctx, command); + } + + @Override + public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable { + Object retval = super.visitCommitCommand(ctx, command); + Set affectedKeys = ctx.getAffectedKeys(); + try { + log.tracef("On commit, send invalidate for keys: %s", affectedKeys); + invalidateAcrossCluster(defaultSynchronous, affectedKeys.toArray()); + return retval; + } catch (Throwable t) { + if (t instanceof RuntimeException) + throw t; + else + throw log.unableToBroadcastInvalidation(t); } - return retval; } @Override @@ -232,7 +254,6 @@ public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) t } } - protected void invalidateAcrossCluster(boolean synchronous, Object[] keys) throws Throwable { // increment invalidations counter if statistics maintained incrementInvalidations(); diff --git a/core/src/main/java/org/infinispan/util/logging/Log.java b/core/src/main/java/org/infinispan/util/logging/Log.java index bf262bf498eb..6d16ae88947b 100644 --- a/core/src/main/java/org/infinispan/util/logging/Log.java +++ b/core/src/main/java/org/infinispan/util/logging/Log.java @@ -850,5 +850,8 @@ void asyncStoreShutdownTimeoutTooHigh(long configuredAsyncStopTimeout, @LogMessage(level = ERROR) @Message(value = "Failed to start rebalance for cache %s", id = 230) void rebalanceStartError(String cacheName, @Cause Throwable cause); + + @Message(value = "Unable to broadcast invalidation messages", id = 231) + RuntimeException unableToBroadcastInvalidation(@Cause Throwable e); }