Browse files

Make writes of transactions truly atomic by introducing the

PersistenceTransaction and MultiPersistenceTransaction abstractions.
  • Loading branch information...
1 parent 4b1a910 commit f80db3396af265f19925aa306f49f8aa906917b2 @thomas-kielbus thomas-kielbus committed Jun 11, 2014
View
18 megadesk-core/src/main/java/com/liveramp/megadesk/base/state/BasePersistence.java
@@ -16,7 +16,25 @@
package com.liveramp.megadesk.base.state;
+import com.liveramp.megadesk.core.state.MultiPersistenceTransaction;
import com.liveramp.megadesk.core.state.Persistence;
+import com.liveramp.megadesk.core.state.PersistenceTransaction;
public abstract class BasePersistence<VALUE> implements Persistence<VALUE> {
+
+ @Override
+ public void writeInMultiTransaction(MultiPersistenceTransaction transaction, VALUE value) {
+ PersistenceTransaction persistenceTransaction = transaction.getTransactionFor(transactionCategory());
+ if (persistenceTransaction == null) {
+ persistenceTransaction = newTransaction();
+ transaction.startTransactionFor(transactionCategory(), persistenceTransaction);
+ }
+ writeInTransaction(persistenceTransaction, value);
+ }
+
+ public abstract Object transactionCategory();
+
+ public abstract PersistenceTransaction newTransaction();
+
+ public abstract void writeInTransaction(PersistenceTransaction transaction, VALUE value);
}
View
18 megadesk-core/src/main/java/com/liveramp/megadesk/base/state/InMemoryPersistence.java
@@ -17,8 +17,9 @@
package com.liveramp.megadesk.base.state;
import com.liveramp.megadesk.core.state.Persistence;
+import com.liveramp.megadesk.core.state.PersistenceTransaction;
-public class InMemoryPersistence<VALUE> implements Persistence<VALUE> {
+public class InMemoryPersistence<VALUE> extends BasePersistence<VALUE> implements Persistence<VALUE> {
private VALUE value;
@@ -39,4 +40,19 @@ public VALUE read() {
public void write(VALUE value) {
this.value = value;
}
+
+ @Override
+ public Object transactionCategory() {
+ return InMemoryPersistence.class;
+ }
+
+ @Override
+ public PersistenceTransaction newTransaction() {
+ return new InMemoryPersistenceTransaction();
+ }
+
+ @Override
+ public void writeInTransaction(PersistenceTransaction transaction, VALUE value) {
+ ((InMemoryPersistenceTransaction)transaction).write(this, value);
+ }
}
View
46 ...k-core/src/main/java/com/liveramp/megadesk/base/state/InMemoryPersistenceTransaction.java
@@ -0,0 +1,46 @@
+/**
+ * Copyright 2014 LiveRamp
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.liveramp.megadesk.base.state;
+
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+
+import com.liveramp.megadesk.core.state.Persistence;
+import com.liveramp.megadesk.core.state.PersistenceTransaction;
+
+public class InMemoryPersistenceTransaction implements PersistenceTransaction {
+
+ private final Map<Persistence, Object> writes;
+
+ public InMemoryPersistenceTransaction() {
+ writes = Maps.newHashMap();
+ }
+
+ public <VALUE> void write(Persistence persistence, VALUE value) {
+ writes.put(persistence, value);
+ }
+
+ @Override
+ public void commit() {
+ for (Map.Entry<Persistence, Object> entry : writes.entrySet()) {
+ Persistence persistence = entry.getKey();
+ Object value = entry.getValue();
+ persistence.write(value);
+ }
+ }
+}
View
8 megadesk-core/src/main/java/com/liveramp/megadesk/base/transaction/BaseAccessor.java
@@ -28,10 +28,12 @@
private final Persistence<VALUE> persistence;
private final DependencyType dependencyType;
+ private boolean written;
public BaseAccessor(VALUE value, DependencyType dependencyType) {
this.dependencyType = dependencyType;
this.persistence = new InMemoryPersistence<VALUE>(value);
+ this.written = false;
}
@Override
@@ -44,6 +46,12 @@ public VALUE read() {
public void write(VALUE value) {
ensureDependencyType(DependencyType.WRITE);
persistence.write(value);
+ written = true;
+ }
+
+ @Override
+ public boolean written() {
+ return written;
}
private void ensureDependencyType(DependencyType... dependencyTypes) {
View
10 megadesk-core/src/main/java/com/liveramp/megadesk/base/transaction/BaseContext.java
@@ -78,6 +78,16 @@ public BaseContext(Dependency dependency) {
}
@Override
+ public <VALUE> boolean written(Reference<VALUE> reference) {
+ return accessor(reference).written();
+ }
+
+ @Override
+ public <VALUE> boolean written(Variable<VALUE> variable) {
+ return written(variable.reference());
+ }
+
+ @Override
public String toString() {
return FormatUtils.formatToString(this, bindings.toString());
}
View
13 ...k-core/src/main/java/com/liveramp/megadesk/base/transaction/BaseTransactionExecution.java
@@ -23,6 +23,7 @@
import com.liveramp.megadesk.base.state.MultiLock;
import com.liveramp.megadesk.core.state.Lock;
+import com.liveramp.megadesk.core.state.MultiPersistenceTransaction;
import com.liveramp.megadesk.core.state.Variable;
import com.liveramp.megadesk.core.transaction.Context;
import com.liveramp.megadesk.core.transaction.Dependency;
@@ -74,11 +75,17 @@ private Context prepare(Dependency dependency) {
@Override
public void commit() {
ensureState(State.RUNNING);
- // Writes
+ // Write in a multi persistence transaction
+ MultiPersistenceTransaction multiPersistenceTransaction = new MultiPersistenceTransaction();
for (Variable variable : dependency.writes()) {
- Object value = context.read(variable);
- variable.driver().persistence().write(value);
+ // Only write variables that have been written to the context
+ if (context.written(variable)) {
+ Object value = context.read(variable);
+ variable.driver().persistence().writeInMultiTransaction(multiPersistenceTransaction, value);
+ }
}
+ // Commit multi persistence transaction
+ multiPersistenceTransaction.commit();
// Release execution locks
lock.unlock();
state = State.COMMITTED;
View
48 ...desk-core/src/main/java/com/liveramp/megadesk/core/state/MultiPersistenceTransaction.java
@@ -0,0 +1,48 @@
+/**
+ * Copyright 2014 LiveRamp
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.liveramp.megadesk.core.state;
+
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+
+public class MultiPersistenceTransaction {
+
+ private final Map<Object, PersistenceTransaction> transactions;
+
+ public MultiPersistenceTransaction() {
+ transactions = Maps.newHashMap();
+ }
+
+ public boolean containsTransactionFor(Object object) {
+ return transactions.containsKey(object);
+ }
+
+ public PersistenceTransaction getTransactionFor(Object object) {
+ return transactions.get(object);
+ }
+
+ public void startTransactionFor(Object object, PersistenceTransaction transaction) {
+ transactions.put(object, transaction);
+ }
+
+ public void commit() {
+ for (PersistenceTransaction transaction : transactions.values()) {
+ transaction.commit();
+ }
+ }
+}
View
2 megadesk-core/src/main/java/com/liveramp/megadesk/core/state/Persistence.java
@@ -21,4 +21,6 @@
VALUE read();
void write(VALUE value);
+
+ void writeInMultiTransaction(MultiPersistenceTransaction transaction, VALUE value);
}
View
22 megadesk-core/src/main/java/com/liveramp/megadesk/core/state/PersistenceTransaction.java
@@ -0,0 +1,22 @@
+/**
+ * Copyright 2014 LiveRamp
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.liveramp.megadesk.core.state;
+
+public interface PersistenceTransaction {
+
+ void commit();
+}
View
2 megadesk-core/src/main/java/com/liveramp/megadesk/core/transaction/Accessor.java
@@ -21,4 +21,6 @@
VALUE read();
void write(VALUE value);
+
+ boolean written();
}
View
4 megadesk-core/src/main/java/com/liveramp/megadesk/core/transaction/Context.java
@@ -32,4 +32,8 @@
<VALUE> void write(Reference<VALUE> reference, VALUE value);
<VALUE> void write(Variable<VALUE> variable, VALUE value);
+
+ <VALUE> boolean written(Reference<VALUE> reference);
+
+ <VALUE> boolean written(Variable<VALUE> variable);
}
View
31 megadesk-curator/src/main/java/com/liveramp/megadesk/curator/state/CuratorPersistence.java
@@ -16,10 +16,12 @@
package com.liveramp.megadesk.curator.state;
+import org.apache.curator.framework.CuratorFramework;
+
import com.liveramp.megadesk.core.state.Persistence;
+import com.liveramp.megadesk.core.state.PersistenceTransaction;
import com.liveramp.megadesk.recipes.state.persistence.SerializationHandler;
import com.liveramp.megadesk.recipes.state.persistence.SerializedPersistence;
-import org.apache.curator.framework.CuratorFramework;
public class CuratorPersistence<VALUE> extends SerializedPersistence<VALUE> implements Persistence<VALUE> {
@@ -41,20 +43,39 @@ public CuratorPersistence(CuratorFramework curator, String path, SerializationHa
}
@Override
- protected void writeBytes(byte[] serializedObject) {
+ protected byte[] readBytes() {
try {
- curator.setData().forPath(path, serializedObject);
+ return curator.getData().forPath(path);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
- protected byte[] readBytes() {
+ protected void writeBytes(byte[] serializedValue) {
try {
- return curator.getData().forPath(path);
+ curator.setData().forPath(path, serializedValue);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
+
+ @Override
+ public Object transactionCategory() {
+ return curator;
+ }
+
+ @Override
+ public PersistenceTransaction newTransaction() {
+ return new CuratorPersistenceTransaction(curator.inTransaction());
+ }
+
+ @Override
+ public void writeInTransaction(PersistenceTransaction transaction, byte[] serializedValue) {
+ try {
+ ((CuratorPersistenceTransaction)transaction).transaction().setData().forPath(path, serializedValue);
+ } catch (Exception e) {
+ throw new RuntimeException(e); // TODO
+ }
+ }
}
View
44 ...ator/src/main/java/com/liveramp/megadesk/curator/state/CuratorPersistenceTransaction.java
@@ -0,0 +1,44 @@
+/**
+ * Copyright 2014 LiveRamp
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.liveramp.megadesk.curator.state;
+
+import org.apache.curator.framework.api.transaction.CuratorTransaction;
+import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
+
+import com.liveramp.megadesk.core.state.PersistenceTransaction;
+
+public class CuratorPersistenceTransaction implements PersistenceTransaction {
+
+ private final CuratorTransaction transaction;
+
+ public CuratorPersistenceTransaction(CuratorTransaction transaction) {
+ this.transaction = transaction;
+ }
+
+ @Override
+ public void commit() {
+ try {
+ ((CuratorTransactionFinal)transaction).commit();
+ } catch (Exception e) {
+ throw new RuntimeException(e); // TODO
+ }
+ }
+
+ public CuratorTransaction transaction() {
+ return transaction;
+ }
+}
View
14 .../src/main/java/com/liveramp/megadesk/recipes/state/persistence/SerializedPersistence.java
@@ -20,6 +20,7 @@
import com.liveramp.megadesk.base.state.BasePersistence;
import com.liveramp.megadesk.core.state.Persistence;
+import com.liveramp.megadesk.core.state.PersistenceTransaction;
public abstract class SerializedPersistence<VALUE> extends BasePersistence<VALUE> implements Persistence<VALUE> {
@@ -52,7 +53,18 @@ public void write(VALUE value) {
}
}
- protected abstract void writeBytes(byte[] serializedObject);
+ @Override
+ public void writeInTransaction(PersistenceTransaction transaction, VALUE value) {
+ try {
+ writeInTransaction(transaction, serializationHandler.serialize(value));
+ } catch (IOException e) {
+ throw new RuntimeException(e); // TODO
+ }
+ }
protected abstract byte[] readBytes();
+
+ protected abstract void writeBytes(byte[] serializedValue);
+
+ public abstract void writeInTransaction(PersistenceTransaction transaction, byte[] serializedValue);
}

0 comments on commit f80db33

Please sign in to comment.