From 50945a93fe5e7dedb2aaed5c49e5a3b80e887d00 Mon Sep 17 00:00:00 2001 From: anew Date: Thu, 6 Oct 2016 15:19:39 -0700 Subject: [PATCH] (TEPHRA-188) Allow to configure a limit for the transaction timeout --- .../java/org/apache/tephra/TransactionManager.java | 8 +++++++- .../src/main/java/org/apache/tephra/TxConstants.java | 9 +++++++++ .../apache/tephra/ThriftTransactionSystemTest.java | 3 +++ .../org/apache/tephra/TransactionManagerTest.java | 1 + .../org/apache/tephra/TransactionSystemTest.java | 12 ++++++++++++ 5 files changed, 32 insertions(+), 1 deletion(-) diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java index e3cd2f82..7faf63ae 100644 --- a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java +++ b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java @@ -142,6 +142,7 @@ public class TransactionManager extends AbstractService { private final int cleanupInterval; private final int defaultTimeout; private final int defaultLongTimeout; + private final int maxTimeout; private DaemonThreadExecutor cleanupThread = null; private volatile TransactionLog currentLog; @@ -174,6 +175,8 @@ public TransactionManager(Configuration conf, @Nonnull TransactionStateStorage p this.persistor = persistor; cleanupInterval = conf.getInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, TxConstants.Manager.DEFAULT_TX_CLEANUP_INTERVAL); + maxTimeout = conf.getInt(TxConstants.Manager.CFG_TX_MAX_TIMEOUT, + TxConstants.Manager.DEFAULT_TX_MAX_TIMEOUT); defaultTimeout = conf.getInt(TxConstants.Manager.CFG_TX_TIMEOUT, TxConstants.Manager.DEFAULT_TX_TIMEOUT); defaultLongTimeout = conf.getInt(TxConstants.Manager.CFG_TX_LONG_TIMEOUT, @@ -722,7 +725,10 @@ public Transaction startShort() { * @param timeoutInSeconds the time out period in seconds. */ public Transaction startShort(int timeoutInSeconds) { - Preconditions.checkArgument(timeoutInSeconds > 0, "timeout must be positive but is %s", timeoutInSeconds); + Preconditions.checkArgument(timeoutInSeconds > 0, + "timeout must be positive but is %s seconds", timeoutInSeconds); + Preconditions.checkArgument(timeoutInSeconds <= maxTimeout, + "timeout must not exceed %s seconds but is %s seconds", maxTimeout, timeoutInSeconds); txMetricsCollector.rate("start.short"); Stopwatch timer = new Stopwatch().start(); long expiration = getTxExpiration(timeoutInSeconds); diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java index 0fd36452..b9a7929a 100644 --- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java +++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java @@ -149,6 +149,15 @@ public static final class Manager { public static final String CFG_TX_LONG_TIMEOUT = "data.tx.long.timeout"; /** Default value for long running transaction timeout, in seconds. */ public static final int DEFAULT_TX_LONG_TIMEOUT = (int) TimeUnit.DAYS.toSeconds(1); + /** + * The limit for the allowed transaction timeout, in seconds. Attempts to start a transaction with a longer + * timeout will fail. + */ + public static final String CFG_TX_MAX_TIMEOUT = "data.tx.max.timeout"; + /** + * The default value for the transaction timeout limit, in seconds: unlimited. + */ + public static final int DEFAULT_TX_MAX_TIMEOUT = Integer.MAX_VALUE; /** The frequency (in seconds) to perform periodic snapshots, or 0 for no periodic snapshots. */ public static final String CFG_TX_SNAPSHOT_INTERVAL = "data.tx.snapshot.interval"; /** Default value for frequency of periodic snapshots of transaction state. */ diff --git a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java index ef9b9c24..3f7e88c1 100644 --- a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java @@ -43,6 +43,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; + public class ThriftTransactionSystemTest extends TransactionSystemTest { private static final Logger LOG = LoggerFactory.getLogger(ThriftTransactionSystemTest.class); @@ -65,6 +67,7 @@ public static void start() throws Exception { conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr()); conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times"); conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1); + conf.setInt(TxConstants.Manager.CFG_TX_MAX_TIMEOUT, (int) TimeUnit.DAYS.toSeconds(5)); // very long limit Injector injector = Guice.createInjector( new ConfigModule(conf), diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java index f74e2094..32692412 100644 --- a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java @@ -56,6 +56,7 @@ protected TransactionStateStorage getStateStorage() { @Before public void before() { conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread + conf.setInt(TxConstants.Manager.CFG_TX_MAX_TIMEOUT, (int) TimeUnit.DAYS.toSeconds(5)); // very long limit // todo should create two sets of tests, one with LocalFileTxStateStorage and one with InMemoryTxStateStorage txStateStorage = new InMemoryTransactionStateStorage(); txManager = new TransactionManager diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java index 797c08a6..77e92320 100644 --- a/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java @@ -42,6 +42,18 @@ public abstract class TransactionSystemTest { protected abstract TransactionStateStorage getStateStorage() throws Exception; + // Unfortunately, in-memory mode and thrift mode throw different exceptions here + @Test(expected = Exception.class) + public void testNegativeTimeout() throws Exception { + getClient().startShort(-1); + } + + // Unfortunately, in-memory mode and thrift mode throw different exceptions here + @Test(expected = Exception.class) + public void testExcessiveTimeout() throws Exception { + getClient().startShort((int) TimeUnit.DAYS.toSeconds(10)); + } + @Test public void testCommitRaceHandling() throws Exception { TransactionSystemClient client1 = getClient();