From 8ea6aa25e8fb88f64c8b43572eb4e518233db4b1 Mon Sep 17 00:00:00 2001 From: Dylan Millikin Date: Tue, 20 Oct 2015 14:29:44 +0200 Subject: [PATCH 1/5] Made Transaction.onReadWrite() a ThreadLocal setting --- .../upgrade-release-3.1.x-incubating.asciidoc | 19 ++++++++++ .../structure/util/AbstractTransaction.java | 37 ++++++++++--------- .../server/GremlinDriverIntegrateTest.java | 7 +++- 3 files changed, 45 insertions(+), 18 deletions(-) diff --git a/docs/src/upgrade-release-3.1.x-incubating.asciidoc b/docs/src/upgrade-release-3.1.x-incubating.asciidoc index a3d4e2ac3f9..e3d81159841 100644 --- a/docs/src/upgrade-release-3.1.x-incubating.asciidoc +++ b/docs/src/upgrade-release-3.1.x-incubating.asciidoc @@ -55,6 +55,15 @@ to closing where a user must now explicitly call `commit()` to persist their mut See link:https://issues.apache.org/jira/browse/TINKERPOP3-805[TINKERPOP3-805] for more information. +Transaction.onReadWrite() and Transaction.onClose() are now `ThreadLocal` settings +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + +The `Transaction.onReadWrite()` and `Transaction.onClose()` settings now need to be set for each thread (if another behavior than the default is desired). +For gremlin-server users that may be changing these settings via scripts : +If the settings are changed from a sessionless request they will now only apply to that one request. +If the settings are changed from an in-session request they will now only apply to all future requests made in the scope of that session. + + Upgrading for Providers ~~~~~~~~~~~~~~~~~~~~~~~ @@ -73,3 +82,13 @@ previously asserted the opposite (i.e. commit on close). These tests have been If these tests were referenced in an `OptOut`, then their names should be updated. See link:https://issues.apache.org/jira/browse/TINKERPOP3-805[TINKERPOP3-805] for more information. + +Driver Implementers +^^^^^^^^^^^^^^^^^^^ + +Transaction.onReadWrite() and Transaction.onClose() are now `ThreadLocal` settings +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + +If your driver configures the `Transaction.onReadWrite()` or `Transaction.onClose()` settings, note that these no longer apply to all future requests. +If the settings are changed from a sessionless request they will only apply to that one request. +If the settings are changed from an in-session request they will only apply to all future requests made in the scope of that session. diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java index 56c6f369d6c..0ad24a61d7c 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java @@ -21,8 +21,6 @@ import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Transaction; -import java.util.ArrayList; -import java.util.List; import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; @@ -37,18 +35,23 @@ * @author Stephen Mallette (http://stephen.genoprime.com) */ public abstract class AbstractTransaction implements Transaction { - protected Consumer readWriteConsumer; - protected Consumer closeConsumer; - + protected static final ThreadLocal> readWriteConsumer = + new ThreadLocal>() { + @Override protected Consumer initialValue() { + return READ_WRITE_BEHAVIOR.AUTO; + } + }; + + protected static final ThreadLocal> closeConsumer = + new ThreadLocal>() { + @Override protected Consumer initialValue() { + return CLOSE_BEHAVIOR.ROLLBACK; + } + }; + private Graph g; public AbstractTransaction(final Graph g) { - // auto transaction behavior - readWriteConsumer = READ_WRITE_BEHAVIOR.AUTO; - - // default is to rollback transactions on close - closeConsumer = CLOSE_BEHAVIOR.ROLLBACK; - this.g = g; } @@ -100,7 +103,7 @@ public void open() { */ @Override public void commit() { - readWriteConsumer.accept(this); + readWriteConsumer.get().accept(this); try { doCommit(); fireOnCommit(); @@ -114,7 +117,7 @@ public void commit() { */ @Override public void rollback() { - readWriteConsumer.accept(this); + readWriteConsumer.get().accept(this); try { doRollback(); fireOnRollback(); @@ -143,7 +146,7 @@ public G createThreadedTx() { */ @Override public void readWrite() { - readWriteConsumer.accept(this); + readWriteConsumer.get().accept(this); } /** @@ -151,7 +154,7 @@ public void readWrite() { */ @Override public void close() { - closeConsumer.accept(this); + closeConsumer.get().accept(this); } /** @@ -159,7 +162,7 @@ public void close() { */ @Override public synchronized Transaction onReadWrite(final Consumer consumer) { - readWriteConsumer = Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull); + readWriteConsumer.set(Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull)); return this; } @@ -168,7 +171,7 @@ public synchronized Transaction onReadWrite(final Consumer consumer */ @Override public synchronized Transaction onClose(final Consumer consumer) { - closeConsumer = Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onCloseBehaviorCannotBeNull); + closeConsumer.set(Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onCloseBehaviorCannotBeNull)); return this; } diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java index 575f96e4bf4..2076cbd48cd 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java @@ -589,7 +589,7 @@ public void shouldExecuteScriptInSessionOnTransactionalWithManualTransactionsGra final Cluster cluster = Cluster.build().create(); final Client client = cluster.connect(name.getMethodName()); - + final Client sessionlessClient = cluster.connect(); client.submit("graph.tx().onReadWrite(Transaction.READ_WRITE_BEHAVIOR.MANUAL);null").all().get(); client.submit("graph.tx().open()").all().get(); @@ -604,6 +604,11 @@ public void shouldExecuteScriptInSessionOnTransactionalWithManualTransactionsGra client.submit("v.property(\"color\",\"blue\")").all().get(); client.submit("graph.tx().commit()").all().get(); + + // Run a sessionless request to change transaction.readWriteConsumer back to AUTO + // The will make the next in session request fail if consumers aren't ThreadLocal + sessionlessClient.submit("graph.vertices().next()").all().get(); + client.submit("graph.tx().open()").all().get(); final Vertex vertexAfterTx = client.submit("graph.vertices().next()").all().get().get(0).getVertex(); From 285bd28b523da9eacb8e84f270a0ebf906f6ca91 Mon Sep 17 00:00:00 2001 From: Dylan Millikin Date: Wed, 21 Oct 2015 14:34:43 +0200 Subject: [PATCH 2/5] removed typos --- docs/src/upgrade-release-3.1.x-incubating.asciidoc | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/docs/src/upgrade-release-3.1.x-incubating.asciidoc b/docs/src/upgrade-release-3.1.x-incubating.asciidoc index 4dd16735368..d031b2db876 100644 --- a/docs/src/upgrade-release-3.1.x-incubating.asciidoc +++ b/docs/src/upgrade-release-3.1.x-incubating.asciidoc @@ -39,6 +39,7 @@ Important Changes * Entire TinkerGraph instances can be serialized over Gryo. * Hadoop1 support has been dropped. Hadoop2 is now supported. Giraph 1.1.0 is now supported and Spark of Hadoop2 YARN. * The implementation and semantics of `GraphTraversal.group()` has changed. The previous model is deprecated and renamed to `groupV3d0()`. +* `Transaction.onReadWrite()` and `Transaction.onClose()` are now `ThreadLocal` settings Please see the link:https://github.com/apache/incubator-tinkerpop/blob/3.1.0-incubating/CHANGELOG.asciidoc#XXXXXXXXXXXXXXXXXXXXXXXXXXXX[changelog] for a complete list of all the modifications that are part of this release. @@ -57,7 +58,6 @@ to closing where a user must now explicitly call `commit()` to persist their mut See link:https://issues.apache.org/jira/browse/TINKERPOP3-805[TINKERPOP3-805] for more information. -<<<<<<< HEAD Transaction.onReadWrite() and Transaction.onClose() are now `ThreadLocal` settings ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ @@ -66,7 +66,8 @@ For gremlin-server users that may be changing these settings via scripts : If the settings are changed from a sessionless request they will now only apply to that one request. If the settings are changed from an in-session request they will now only apply to all future requests made in the scope of that session. -======= +See link:https://issues.apache.org/jira/browse/TINKERPOP3-885[TINKERPOP3-885] + Gremlin Process ^^^^^^^^^^^^^^^ @@ -100,7 +101,6 @@ Hadoop-Gremlin Updates * Hadoop1 is no longer supported. Hadoop2 is now the only supported Hadoop version in TinkerPop. * The directory where application jars are stored in HDFS is now `hadoop-gremlin-x.y.z-libs`. ** This versioning is important so that cross-version TinkerPop use does not cause jar conflicts. ->>>>>>> master Upgrading for Providers ~~~~~~~~~~~~~~~~~~~~~~~ @@ -121,7 +121,6 @@ If these tests were referenced in an `OptOut`, then their names should be update See link:https://issues.apache.org/jira/browse/TINKERPOP3-805[TINKERPOP3-805] for more information. -<<<<<<< HEAD Driver Implementers ^^^^^^^^^^^^^^^^^^^ @@ -131,9 +130,8 @@ Transaction.onReadWrite() and Transaction.onClose() are now `ThreadLocal` settin If your driver configures the `Transaction.onReadWrite()` or `Transaction.onClose()` settings, note that these no longer apply to all future requests. If the settings are changed from a sessionless request they will only apply to that one request. If the settings are changed from an in-session request they will only apply to all future requests made in the scope of that session. -======= + Graph Traversal Updates +++++++++++++++++++++++ There were numerous changes to the `GraphTraversal` API. Nearly all changes are backwards compatible with respective "deprecated" annotations. Please review the respective updates specified in the "Graph System Users" section. ->>>>>>> master From 35e97f08e4359c0ae09f431c7680f8c13567549b Mon Sep 17 00:00:00 2001 From: Dylan Millikin Date: Wed, 21 Oct 2015 14:37:19 +0200 Subject: [PATCH 3/5] moved Driver implementor section down in it's correct position --- docs/src/upgrade-release-3.1.x-incubating.asciidoc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/src/upgrade-release-3.1.x-incubating.asciidoc b/docs/src/upgrade-release-3.1.x-incubating.asciidoc index d031b2db876..e42be2b7a94 100644 --- a/docs/src/upgrade-release-3.1.x-incubating.asciidoc +++ b/docs/src/upgrade-release-3.1.x-incubating.asciidoc @@ -121,6 +121,11 @@ If these tests were referenced in an `OptOut`, then their names should be update See link:https://issues.apache.org/jira/browse/TINKERPOP3-805[TINKERPOP3-805] for more information. +Graph Traversal Updates ++++++++++++++++++++++++ + +There were numerous changes to the `GraphTraversal` API. Nearly all changes are backwards compatible with respective "deprecated" annotations. Please review the respective updates specified in the "Graph System Users" section. + Driver Implementers ^^^^^^^^^^^^^^^^^^^ @@ -129,9 +134,4 @@ Transaction.onReadWrite() and Transaction.onClose() are now `ThreadLocal` settin If your driver configures the `Transaction.onReadWrite()` or `Transaction.onClose()` settings, note that these no longer apply to all future requests. If the settings are changed from a sessionless request they will only apply to that one request. -If the settings are changed from an in-session request they will only apply to all future requests made in the scope of that session. - -Graph Traversal Updates -+++++++++++++++++++++++ - -There were numerous changes to the `GraphTraversal` API. Nearly all changes are backwards compatible with respective "deprecated" annotations. Please review the respective updates specified in the "Graph System Users" section. +If the settings are changed from an in-session request they will only apply to all future requests made in the scope of that session. \ No newline at end of file From 3938caed7fd0eb62453a2a2deb716e283c4b79a7 Mon Sep 17 00:00:00 2001 From: Dylan Millikin Date: Sun, 25 Oct 2015 13:45:35 +0100 Subject: [PATCH 4/5] moved logic to AbstractThreadLocalTransaction and added a couple of tests --- .../util/AbstractThreadLocalTransaction.java | 38 +++++++- .../util/AbstractThreadedTransaction.java | 29 +++++- .../structure/util/AbstractTransaction.java | 50 ++++++---- .../gremlin/structure/TransactionTest.java | 94 +++++++++++++++++++ 4 files changed, 189 insertions(+), 22 deletions(-) diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java index b47eb79ff6a..310921527bf 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.function.Consumer; /** @@ -36,7 +37,20 @@ * @author Stephen Mallette (http://stephen.genoprime.com) */ public abstract class AbstractThreadLocalTransaction extends AbstractTransaction { - +protected static final ThreadLocal> readWriteConsumerInternal = + new ThreadLocal>() { + @Override protected Consumer initialValue() { + return READ_WRITE_BEHAVIOR.AUTO; + } + }; + + protected static final ThreadLocal> closeConsumerInternal = + new ThreadLocal>() { + @Override protected Consumer initialValue() { + return CLOSE_BEHAVIOR.ROLLBACK; + } + }; + protected final ThreadLocal>> transactionListeners = new ThreadLocal>>() { @Override protected List> initialValue() { @@ -72,4 +86,26 @@ public void removeTransactionListener(final Consumer listener) { public void clearTransactionListeners() { transactionListeners.get().clear(); } + + @Override + public void doReadWrite() { + readWriteConsumerInternal.get().accept(this); + } + + @Override + public void doClose() { + closeConsumerInternal.get().accept(this); + closeConsumerInternal.remove(); + readWriteConsumerInternal.remove(); + } + + @Override + public void setReadWrite(final Consumer consumer) { + readWriteConsumerInternal.set(Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull)); + } + + @Override + public void setClose(final Consumer consumer) { + closeConsumerInternal.set(Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull)); + } } diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java index 57f8ec0b7ac..246734a5bb0 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java @@ -22,6 +22,7 @@ import org.apache.tinkerpop.gremlin.structure.Transaction; import java.util.List; +import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; @@ -38,8 +39,12 @@ public abstract class AbstractThreadedTransaction extends AbstractTransaction { protected final List> transactionListeners = new CopyOnWriteArrayList<>(); - - public AbstractThreadedTransaction(final Graph g) { + + protected Consumer readWriteConsumerInternal = READ_WRITE_BEHAVIOR.AUTO; + + protected Consumer closeConsumerInternal = CLOSE_BEHAVIOR.ROLLBACK; + + public AbstractThreadedTransaction(final Graph g) { super(g); } @@ -67,4 +72,24 @@ public void removeTransactionListener(final Consumer listener) { public void clearTransactionListeners() { transactionListeners.clear(); } + + @Override + public void doReadWrite() { + readWriteConsumerInternal.accept(this); + } + + @Override + public void doClose() { + closeConsumerInternal.accept(this); + } + + @Override + public void setReadWrite(final Consumer consumer) { + readWriteConsumerInternal = Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull); + } + + @Override + public void setClose(final Consumer consumer) { + closeConsumerInternal = Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull); + } } diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java index 0ad24a61d7c..de7b6ee09a1 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java @@ -35,19 +35,6 @@ * @author Stephen Mallette (http://stephen.genoprime.com) */ public abstract class AbstractTransaction implements Transaction { - protected static final ThreadLocal> readWriteConsumer = - new ThreadLocal>() { - @Override protected Consumer initialValue() { - return READ_WRITE_BEHAVIOR.AUTO; - } - }; - - protected static final ThreadLocal> closeConsumer = - new ThreadLocal>() { - @Override protected Consumer initialValue() { - return CLOSE_BEHAVIOR.ROLLBACK; - } - }; private Graph g; @@ -86,6 +73,31 @@ public AbstractTransaction(final Graph g) { * {@link #addTransactionListener(Consumer)}. */ protected abstract void fireOnRollback(); + + + /** + * Called {@link #readWrite}. + * Implementers should run their readWrite consumer here. + */ + protected abstract void doReadWrite(); + + /** + * Called {@link #close}. + * Implementers should run their readWrite consumer here. + */ + protected abstract void doClose(); + + /** + * Called {@link #onReadWrite}. + * Implementers should set their readWrite consumer here. + */ + protected abstract void setReadWrite(final Consumer consumer); + + /** + * Called {@link #onClose}. + * Implementers should set their close consumer here. + */ + protected abstract void setClose(final Consumer consumer); /** * {@inheritDoc} @@ -103,7 +115,7 @@ public void open() { */ @Override public void commit() { - readWriteConsumer.get().accept(this); + readWrite(); try { doCommit(); fireOnCommit(); @@ -117,7 +129,7 @@ public void commit() { */ @Override public void rollback() { - readWriteConsumer.get().accept(this); + readWrite(); try { doRollback(); fireOnRollback(); @@ -146,7 +158,7 @@ public G createThreadedTx() { */ @Override public void readWrite() { - readWriteConsumer.get().accept(this); + doReadWrite(); } /** @@ -154,7 +166,7 @@ public void readWrite() { */ @Override public void close() { - closeConsumer.get().accept(this); + doClose(); } /** @@ -162,7 +174,7 @@ public void close() { */ @Override public synchronized Transaction onReadWrite(final Consumer consumer) { - readWriteConsumer.set(Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull)); + setReadWrite(consumer); return this; } @@ -171,7 +183,7 @@ public synchronized Transaction onReadWrite(final Consumer consumer */ @Override public synchronized Transaction onClose(final Consumer consumer) { - closeConsumer.set(Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onCloseBehaviorCannotBeNull)); + setClose(consumer); return this; } diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java index 2395d9a6d5d..0157be2c2ee 100644 --- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java +++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java @@ -36,6 +36,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; +import java.util.logging.Logger; import static org.apache.tinkerpop.gremlin.structure.Graph.Features.EdgePropertyFeatures; import static org.apache.tinkerpop.gremlin.structure.Graph.Features.GraphFeatures.FEATURE_TRANSACTIONS; @@ -1042,4 +1044,96 @@ public void shouldAllowReferenceOfEdgeIdOutsideOfOriginalThreadAuto() throws Exc g.tx().rollback(); } + + @Test + @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class, feature = Graph.Features.GraphFeatures.FEATURE_TRANSACTIONS) + public void shouldShareTransactionConsumersAccrossThreads() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(2); + final AtomicBoolean openOccured1 = new AtomicBoolean(false); + final AtomicBoolean openOccured2 = new AtomicBoolean(false); + + final Thread t1 = new Thread(() -> { + try { + g.tx().onReadWrite(Transaction.READ_WRITE_BEHAVIOR.MANUAL); + + latch.countDown(); + latch.await(); + g.tx().open(); + openOccured1.set(true); + } catch (Exception ex) { + openOccured1.set(false); + } + + }); + + final Thread t2 = new Thread(() -> { + try { + + latch.countDown(); + latch.await(); + g.tx().open(); + openOccured2.set(true); + } catch (Exception ex) { + openOccured2.set(false); + } + + }); + + t1.start(); + t2.start(); + t1.join(); + t2.join(); + + assertTrue( + "Thread t1 transaction should have been set to MANUAL and capable of opening a transaction", + openOccured1.get() + ); + assertTrue( + "Thread t2 transation should have been set to MANUAL and capable of opening a transaction", + openOccured2.get() + ); + } + + @Test + @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class, feature = Graph.Features.GraphFeatures.FEATURE_THREADED_TRANSACTIONS) + public void shouldNotShareTransactionConsumersAccrossThreads() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(2); + final AtomicBoolean openOccured1 = new AtomicBoolean(false); + final AtomicBoolean openOccured2 = new AtomicBoolean(false); + + final Thread t1 = new Thread(() -> { + try { + g.tx().onReadWrite(Transaction.READ_WRITE_BEHAVIOR.MANUAL); + + latch.countDown(); + latch.await(); + g.tx().open(); + openOccured1.set(true); + } catch (Exception ex) { + openOccured1.set(false); + } + + }); + + final Thread t2 = new Thread(() -> { + try { + + latch.countDown(); + latch.await(); + g.tx().open(); + openOccured2.set(true); + } catch (Exception ex) { + openOccured2.set(false); + } + + }); + + t1.start(); + t2.start(); + t1.join(); + t2.join(); + + assertTrue("Thread t1 transaction should have been set to MANUAL and capable of opening a transaction", openOccured1.get()); + assertTrue("Thread t2 transaction should have been set to MANUAL and capable of opening a transaction", !openOccured2.get()); + } } From 46f821a846abc4fceb776f11c3b25888f01d60ef Mon Sep 17 00:00:00 2001 From: Dylan Millikin Date: Sun, 25 Oct 2015 13:45:35 +0100 Subject: [PATCH 5/5] moved logic to AbstractThreadLocalTransaction and added a couple of tests --- .../util/AbstractThreadLocalTransaction.java | 38 +++- .../util/AbstractThreadedTransaction.java | 29 ++- .../structure/util/AbstractTransaction.java | 50 +++-- .../gremlin/structure/TransactionTest.java | 174 ++++++++++++++++++ 4 files changed, 269 insertions(+), 22 deletions(-) diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java index b47eb79ff6a..310921527bf 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.function.Consumer; /** @@ -36,7 +37,20 @@ * @author Stephen Mallette (http://stephen.genoprime.com) */ public abstract class AbstractThreadLocalTransaction extends AbstractTransaction { - +protected static final ThreadLocal> readWriteConsumerInternal = + new ThreadLocal>() { + @Override protected Consumer initialValue() { + return READ_WRITE_BEHAVIOR.AUTO; + } + }; + + protected static final ThreadLocal> closeConsumerInternal = + new ThreadLocal>() { + @Override protected Consumer initialValue() { + return CLOSE_BEHAVIOR.ROLLBACK; + } + }; + protected final ThreadLocal>> transactionListeners = new ThreadLocal>>() { @Override protected List> initialValue() { @@ -72,4 +86,26 @@ public void removeTransactionListener(final Consumer listener) { public void clearTransactionListeners() { transactionListeners.get().clear(); } + + @Override + public void doReadWrite() { + readWriteConsumerInternal.get().accept(this); + } + + @Override + public void doClose() { + closeConsumerInternal.get().accept(this); + closeConsumerInternal.remove(); + readWriteConsumerInternal.remove(); + } + + @Override + public void setReadWrite(final Consumer consumer) { + readWriteConsumerInternal.set(Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull)); + } + + @Override + public void setClose(final Consumer consumer) { + closeConsumerInternal.set(Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull)); + } } diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java index 57f8ec0b7ac..246734a5bb0 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java @@ -22,6 +22,7 @@ import org.apache.tinkerpop.gremlin.structure.Transaction; import java.util.List; +import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; @@ -38,8 +39,12 @@ public abstract class AbstractThreadedTransaction extends AbstractTransaction { protected final List> transactionListeners = new CopyOnWriteArrayList<>(); - - public AbstractThreadedTransaction(final Graph g) { + + protected Consumer readWriteConsumerInternal = READ_WRITE_BEHAVIOR.AUTO; + + protected Consumer closeConsumerInternal = CLOSE_BEHAVIOR.ROLLBACK; + + public AbstractThreadedTransaction(final Graph g) { super(g); } @@ -67,4 +72,24 @@ public void removeTransactionListener(final Consumer listener) { public void clearTransactionListeners() { transactionListeners.clear(); } + + @Override + public void doReadWrite() { + readWriteConsumerInternal.accept(this); + } + + @Override + public void doClose() { + closeConsumerInternal.accept(this); + } + + @Override + public void setReadWrite(final Consumer consumer) { + readWriteConsumerInternal = Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull); + } + + @Override + public void setClose(final Consumer consumer) { + closeConsumerInternal = Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull); + } } diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java index 0ad24a61d7c..de7b6ee09a1 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java @@ -35,19 +35,6 @@ * @author Stephen Mallette (http://stephen.genoprime.com) */ public abstract class AbstractTransaction implements Transaction { - protected static final ThreadLocal> readWriteConsumer = - new ThreadLocal>() { - @Override protected Consumer initialValue() { - return READ_WRITE_BEHAVIOR.AUTO; - } - }; - - protected static final ThreadLocal> closeConsumer = - new ThreadLocal>() { - @Override protected Consumer initialValue() { - return CLOSE_BEHAVIOR.ROLLBACK; - } - }; private Graph g; @@ -86,6 +73,31 @@ public AbstractTransaction(final Graph g) { * {@link #addTransactionListener(Consumer)}. */ protected abstract void fireOnRollback(); + + + /** + * Called {@link #readWrite}. + * Implementers should run their readWrite consumer here. + */ + protected abstract void doReadWrite(); + + /** + * Called {@link #close}. + * Implementers should run their readWrite consumer here. + */ + protected abstract void doClose(); + + /** + * Called {@link #onReadWrite}. + * Implementers should set their readWrite consumer here. + */ + protected abstract void setReadWrite(final Consumer consumer); + + /** + * Called {@link #onClose}. + * Implementers should set their close consumer here. + */ + protected abstract void setClose(final Consumer consumer); /** * {@inheritDoc} @@ -103,7 +115,7 @@ public void open() { */ @Override public void commit() { - readWriteConsumer.get().accept(this); + readWrite(); try { doCommit(); fireOnCommit(); @@ -117,7 +129,7 @@ public void commit() { */ @Override public void rollback() { - readWriteConsumer.get().accept(this); + readWrite(); try { doRollback(); fireOnRollback(); @@ -146,7 +158,7 @@ public G createThreadedTx() { */ @Override public void readWrite() { - readWriteConsumer.get().accept(this); + doReadWrite(); } /** @@ -154,7 +166,7 @@ public void readWrite() { */ @Override public void close() { - closeConsumer.get().accept(this); + doClose(); } /** @@ -162,7 +174,7 @@ public void close() { */ @Override public synchronized Transaction onReadWrite(final Consumer consumer) { - readWriteConsumer.set(Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull)); + setReadWrite(consumer); return this; } @@ -171,7 +183,7 @@ public synchronized Transaction onReadWrite(final Consumer consumer */ @Override public synchronized Transaction onClose(final Consumer consumer) { - closeConsumer.set(Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onCloseBehaviorCannotBeNull)); + setClose(consumer); return this; } diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java index 2395d9a6d5d..eae224fbbe1 100644 --- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java +++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java @@ -36,6 +36,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; +import java.util.logging.Logger; import static org.apache.tinkerpop.gremlin.structure.Graph.Features.EdgePropertyFeatures; import static org.apache.tinkerpop.gremlin.structure.Graph.Features.GraphFeatures.FEATURE_TRANSACTIONS; @@ -1042,4 +1044,176 @@ public void shouldAllowReferenceOfEdgeIdOutsideOfOriginalThreadAuto() throws Exc g.tx().rollback(); } + + @Test + @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class, feature = Graph.Features.GraphFeatures.FEATURE_TRANSACTIONS) + public void shouldNotShareTransactionReadWriteConsumersAccrossThreads() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean commitFailed = new AtomicBoolean(false); + final AtomicBoolean commitOccured = new AtomicBoolean(false); + + final Thread manualThread = new Thread(() -> { + graph.tx().onReadWrite(Transaction.READ_WRITE_BEHAVIOR.MANUAL); + try { + latch.await(); + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } + + try{ + graph.tx().commit(); + commitFailed.set(false); + } catch (Exception ex) { + commitFailed.set(true); + } + }); + + manualThread.start(); + + final Thread autoThread = new Thread(() -> { + latch.countDown(); + try { + graph.tx().commit(); + commitOccured.set(true); + } catch (Exception ex) { + commitOccured.set(false); + } + }); + + autoThread.start(); + + manualThread.join(); + autoThread.join(); + + assertTrue( + "manualThread transaction readWrite should be MANUAL and should fail to commit the transaction", + commitFailed.get() + ); + assertTrue( + "autoThread transaction readWrite should be AUTO and should commit the transaction", + commitOccured.get() + ); + } + + @Test + @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class, feature = Graph.Features.GraphFeatures.FEATURE_TRANSACTIONS) + public void shouldNotShareTransactionCloseConsumersAccrossThreads() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + + final Thread manualThread = new Thread(() -> { + graph.tx().onClose(Transaction.CLOSE_BEHAVIOR.COMMIT); + try { + latch.await(); + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } + }); + + manualThread.start(); + + final Thread autoThread = new Thread(() -> { + latch.countDown(); + graph.addVertex(); + graph.tx().close(); + }); + + autoThread.start(); + + manualThread.join(); + autoThread.join(); + + assertEquals( + "Graph should be empty. autoThread transaction.onClose() should be ROLLBACK (default)", + 0, + IteratorUtils.count(graph.vertices()) + ); + } + + @Test + @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class, feature = Graph.Features.GraphFeatures.FEATURE_THREADED_TRANSACTIONS) + public void shouldShareTransactionReadWriteConsumersAccrossThreads() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean commitFailed = new AtomicBoolean(false); + final AtomicBoolean commitFailedAgain = new AtomicBoolean(false); + + final Thread manualThread = new Thread(() -> { + Transaction tx = graph.tx().createThreadedTx(); + tx.onReadWrite(Transaction.READ_WRITE_BEHAVIOR.MANUAL); + try { + latch.await(); + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } + + try{ + tx.commit(); + commitFailed.set(false); + } catch (Exception ex) { + commitFailed.set(true); + } + }); + + manualThread.start(); + + final Thread autoThread = new Thread(() -> { + latch.countDown(); + Transaction tx = graph.tx().createThreadedTx(); + try { + tx.commit(); + commitFailedAgain.set(false); + } catch (Exception ex) { + commitFailedAgain.set(true); + } + }); + + autoThread.start(); + + manualThread.join(); + autoThread.join(); + + assertTrue( + "manualThread transaction readWrite should be MANUAL and should fail to commit the transaction", + commitFailed.get() + ); + assertTrue( + "autoThread transaction readWrite should be AUTO and should commit the transaction", + commitFailedAgain.get() + ); + } + + @Test + @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class, feature = Graph.Features.GraphFeatures.FEATURE_THREADED_TRANSACTIONS) + public void shouldShareTransactionCloseConsumersAccrossThreads() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + + final Thread manualThread = new Thread(() -> { + Transaction tx = graph.tx().createThreadedTx(); + tx.onClose(Transaction.CLOSE_BEHAVIOR.COMMIT); + try { + latch.await(); + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } + }); + + manualThread.start(); + + final Thread autoThread = new Thread(() -> { + latch.countDown(); + Transaction tx = graph.tx().createThreadedTx(); + graph.addVertex(); + tx.close(); + }); + + autoThread.start(); + + manualThread.join(); + autoThread.join(); + + assertEquals( + "Graph should contain elements. autoThread.onClose() should be COMMIT.", + 1, + IteratorUtils.count(graph.vertices()) + ); + } }