Skip to content

Commit

Permalink
Style updates org.corfudb.runtime.object.transactions (#695) (#748)
Browse files Browse the repository at this point in the history
  • Loading branch information
kjames88 authored and no2chem committed Jun 21, 2017
1 parent 5fd9c45 commit 4516f5f
Show file tree
Hide file tree
Showing 10 changed files with 374 additions and 284 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,48 +1,52 @@
package org.corfudb.runtime.object.transactions;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.IntStream;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.corfudb.protocols.logprotocol.ISMRConsumable;
import org.corfudb.protocols.logprotocol.SMREntry;
import org.corfudb.protocols.wireprotocol.ILogData;
import org.corfudb.protocols.wireprotocol.TxResolutionInfo;
import org.corfudb.runtime.exceptions.AbortCause;
import org.corfudb.runtime.exceptions.NetworkException;
import org.corfudb.runtime.exceptions.TransactionAbortedException;
import org.corfudb.runtime.exceptions.TrimmedException;
import org.corfudb.runtime.object.ICorfuSMRAccess;
import org.corfudb.runtime.object.ICorfuSMRProxyInternal;
import org.corfudb.runtime.object.VersionLockedObject;
import org.corfudb.runtime.view.Address;

import java.util.*;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.IntStream;

import static org.corfudb.runtime.view.ObjectsView.TRANSACTION_STREAM_ID;

/** A Corfu optimistic transaction context.
*
* Optimistic transactions in Corfu provide the following isolation guarantees:
* <p>Optimistic transactions in Corfu provide the following isolation guarantees:
*
* (1) Read-your-own Writes:
* <p>(1) Read-your-own Writes:
* Reads in a transaction are guaranteed to observe a write in the same
* transaction, if a write happens before
* the read.
*
* (2) Opacity:
* <p>(2) Opacity:
* Read in a transaction observe the state of the system ("snapshot") as of the time of the
* first read which occurs in the transaction ("first read
* timestamp"), except in case (1) above where they observe the own tranasction's writes.
*
* (3) Atomicity:
* <p>(3) Atomicity:
* Writes in a transaction are guaranteed to commit atomically,
* and commit if and only if none of the objects which were
* read (the "read set") were modified between the first read
* ("first read timestamp") and the time of commit.
*
* Created by mwei on 4/4/16.
* <p>Created by mwei on 4/4/16.
*/
@Slf4j
public class OptimisticTransactionalContext extends AbstractTransactionalContext {
Expand All @@ -61,11 +65,11 @@ public class OptimisticTransactionalContext extends AbstractTransactionalContext
* Access within optimistic transactional context is implemented
* in via proxy.access() as follows:
*
* 1. First, we try to grab a read-lock on the proxy, and hope to "catch" the proxy in the
* <p>1. First, we try to grab a read-lock on the proxy, and hope to "catch" the proxy in the
* snapshot version. If this succeeds, we invoke the corfu-object access method, and
* un-grab the read-lock.
*
* 2. Otherwise, we grab a write-lock on the proxy and bring it to the correct
* <p>2. Otherwise, we grab a write-lock on the proxy and bring it to the correct
* version
* - Inside proxy.setAsOptimisticStream, if there are currently optimistic
* updates on the proxy, we roll them back. Then, we set this
Expand All @@ -90,45 +94,48 @@ public <R, T> R access(ICorfuSMRProxyInternal<T> proxy,
.getUnderlyingObject()
.access(o -> (
getWriteSetEntrySize(proxy.getStreamID()) == 0 && // No updates
o.getVersionUnsafe() == getSnapshotTimestamp() && // And at the correct timestamp
(o.getOptimisticStreamUnsafe() == null ||
o.getOptimisticStreamUnsafe()
.isStreamCurrentContextThreadCurrentContext() )
// And at the correct timestamp
o.getVersionUnsafe() == getSnapshotTimestamp()
&& (o.getOptimisticStreamUnsafe() == null
|| o.getOptimisticStreamUnsafe()
.isStreamCurrentContextThreadCurrentContext())
),
o -> {
// Swap ourselves to be the active optimistic stream.
// Inside setAsOptimisticStream, if there are
// currently optimistic updates on the object, we
// roll them back. Then, we set this context as the
// object's new optimistic context.
setAsOptimisticStream(o);

// inside syncObjectUnsafe, depending on the object
// version, we may need to undo or redo
// committed changes, or apply forward committed changes.
try {
o.syncObjectUnsafe(getSnapshotTimestamp());
} catch (TrimmedException te) {
// If a trim is encountered, we must reset the object
o.resetUnsafe();
// and abort the transaction
TransactionAbortedException tae =
new TransactionAbortedException(
new TxResolutionInfo(getTransactionID(),
getSnapshotTimestamp()), null, AbortCause.TRIM);
abortTransaction(tae);
throw tae;
}
},
o -> accessFunction.access(o)
o -> {
// Swap ourselves to be the active optimistic stream.
// Inside setAsOptimisticStream, if there are
// currently optimistic updates on the object, we
// roll them back. Then, we set this context as the
// object's new optimistic context.
setAsOptimisticStream(o);

// inside syncObjectUnsafe, depending on the object
// version, we may need to undo or redo
// committed changes, or apply forward committed changes.
try {
o.syncObjectUnsafe(getSnapshotTimestamp());
} catch (TrimmedException te) {
// If a trim is encountered, we must reset the object
o.resetUnsafe();
// and abort the transaction
TransactionAbortedException tae =
new TransactionAbortedException(
new TxResolutionInfo(getTransactionID(),
getSnapshotTimestamp()), null,
AbortCause.TRIM);
abortTransaction(tae);
throw tae;
}
},
o -> accessFunction.access(o)
);
}

/**
* if a Corfu object's method is an Accessor-Mutator, then although the mutation is delayed,
* it needs to obtain the result by invoking getUpcallResult() on the optimistic stream.
*
* This is similar to the second stage of access(), accept working on the optimistic stream instead of the
* <p>This is similar to the second stage of access(), accept working
* on the optimistic stream instead of the
* underlying stream.- grabs the write-lock on the proxy.
* - uses proxy.setAsOptimisticStream in order to set itself as the proxy optimistic context,
* including rolling-back current optimistic changes, if any.
Expand All @@ -146,7 +153,7 @@ public <T> Object getUpcallResult(ICorfuSMRProxyInternal<T> proxy,

// if we have a result, return it.
SMREntry wrapper = getWriteSetEntryList(proxy.getStreamID()).get((int)timestamp);
if (wrapper != null && wrapper.isHaveUpcallResult()){
if (wrapper != null && wrapper.isHaveUpcallResult()) {
return wrapper.getUpcallResult();
}
// Otherwise, we need to sync the object
Expand All @@ -155,13 +162,13 @@ public <T> Object getUpcallResult(ICorfuSMRProxyInternal<T> proxy,
log.trace("Upcall[{}] {} Sync'd", this, timestamp);
o.syncObjectUnsafe(getSnapshotTimestamp());
SMREntry wrapper2 = getWriteSetEntryList(proxy.getStreamID()).get((int)timestamp);
if (wrapper2 != null && wrapper2.isHaveUpcallResult()){
if (wrapper2 != null && wrapper2.isHaveUpcallResult()) {
return wrapper2.getUpcallResult();
}
// If we still don't have the upcall, this must be a bug.
throw new RuntimeException("Tried to get upcall during a transaction but" +
" we don't have it even after an optimistic sync (asked for " + timestamp +
" we have 0-" + (getWriteSetEntryList(proxy.getStreamID()).size() - 1) + ")");
throw new RuntimeException("Tried to get upcall during a transaction but"
+ " we don't have it even after an optimistic sync (asked for " + timestamp
+ " we have 0-" + (getWriteSetEntryList(proxy.getStreamID()).size() - 1) + ")");
});
}

Expand All @@ -174,26 +181,27 @@ public <T> Object getUpcallResult(ICorfuSMRProxyInternal<T> proxy,
* @param <T> Type of the underlying object
*/
<T> void setAsOptimisticStream(VersionLockedObject<T> object) {
if (object.getOptimisticStreamUnsafe() == null ||
!object.getOptimisticStreamUnsafe()
if (object.getOptimisticStreamUnsafe() == null
|| !object.getOptimisticStreamUnsafe()
.isStreamCurrentContextThreadCurrentContext()) {

// We are setting the current context to the root context of nested transactions.
// Upon sync forward
// the stream will replay every entries from all parent transactional context.
WriteSetSMRStream newSMRStream =
WriteSetSMRStream newSmrStream =
new WriteSetSMRStream(TransactionalContext.getTransactionStackAsList(),
object.getID());

newSMRStream.currentContext = 0;
object.setOptimisticStreamUnsafe(newSMRStream);
newSmrStream.currentContext = 0;
object.setOptimisticStreamUnsafe(newSmrStream);
}
}

/** Logs an update. In the case of an optimistic transaction, this update
* is logged to the write set for the transaction.
*
* Return the "address" of the update; used for retrieving results from operations via getUpcallRestult.
* <p>Return the "address" of the update; used for retrieving results
* from operations via getUpcallRestult.
*
* @param proxy The proxy making the request.
* @param updateEntry The timestamp of the request.
Expand Down Expand Up @@ -245,6 +253,12 @@ public long commitTransaction() throws TransactionAbortedException {
return getConflictSetAndCommit(() -> getReadSetInfo().getReadSetConflicts());
}

/**
* Commit with a given conflict set and return the address.
*
* @param computeConflictSet conflict set used to check whether transaction can commit
* @return the commit address
*/
public long getConflictSetAndCommit(Supplier<Map<UUID, Set<Integer>>>
computeConflictSet) {

Expand All @@ -263,7 +277,8 @@ public long getConflictSetAndCommit(Supplier<Map<UUID, Set<Integer>>>
}

// Write to the transaction stream if transaction logging is enabled
Set<UUID> affectedStreams = new HashSet<>(getWriteSetInfo().getWriteSet().getEntryMap().keySet());
Set<UUID> affectedStreams = new HashSet<>(getWriteSetInfo().getWriteSet()
.getEntryMap().keySet());
if (this.builder.runtime.getObjectsView().isTransactionLogging()) {
affectedStreams.add(TRANSACTION_STREAM_ID);
}
Expand Down Expand Up @@ -318,7 +333,7 @@ protected void tryCommitAllProxies() {
// If some other client updated this object, sync
// it forward to grab those updates
x.getUnderlyingObject().syncObjectUnsafe(
commitAddress-1);
commitAddress - 1);
// Also, be nice and transfer the undo
// log from the optimistic updates
// for this to work the write sets better
Expand All @@ -329,8 +344,8 @@ protected void tryCommitAllProxies() {
((ISMRConsumable) committedEntry
.getPayload(this.getBuilder().runtime))
.getSMRUpdates(x.getStreamID());
if (committedWrites.size() ==
entryWrites.size()) {
if (committedWrites.size()
== entryWrites.size()) {
IntStream.range(0, committedWrites.size())
.forEach(i -> {
if (committedWrites.get(i)
Expand Down Expand Up @@ -374,7 +389,8 @@ protected void updateAllProxies(Consumer<ICorfuSMRProxyInternal> function) {
private OptimisticTransactionalContext getRootContext() {
AbstractTransactionalContext atc = TransactionalContext.getRootContext();
if (atc != null && !(atc instanceof OptimisticTransactionalContext)) {
throw new RuntimeException("Attempted to nest two different transactional context types");
throw new RuntimeException("Attempted to nest two different "
+ "transactional context types");
}
return (OptimisticTransactionalContext)atc;
}
Expand All @@ -395,7 +411,8 @@ public synchronized long obtainSnapshotTimestamp() {
// Otherwise, fetch a read token from the sequencer the linearize
// ourselves against.
long currentTail = builder.runtime
.getSequencerView().nextToken(Collections.emptySet(), 0).getToken().getTokenValue();
.getSequencerView().nextToken(Collections.emptySet(),
0).getToken().getTokenValue();
log.trace("SnapshotTimestamp[{}] {}", this, currentTail);
return currentTail;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.corfudb.runtime.object.transactions;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

import lombok.Getter;

/**
* This class captures information about objects accessed (read) during speculative
* transaction execution.
*/
@Getter
class ReadSetInfo {
// fine-grained conflict information regarding accessed-objects;
// captures values passed using @conflict annotations in @corfuObject
Map<UUID, Set<Integer>> readSetConflicts = new HashMap<>();

public void mergeInto(ReadSetInfo other) {
other.getReadSetConflicts().forEach((streamId, cset) -> {
getConflictSet(streamId).addAll(cset);
});
}

public void addToReadSet(UUID streamId, Object[] conflictObjects) {
if (conflictObjects == null) {
return;
}

Set<Integer> streamConflicts = getConflictSet(streamId);
Arrays.asList(conflictObjects).stream()
.forEach(V -> streamConflicts.add(Integer.valueOf(V.hashCode())));
}

public Set<Integer> getConflictSet(UUID streamId) {
return getReadSetConflicts().computeIfAbsent(streamId, u -> {
return new HashSet<>();
});
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package org.corfudb.runtime.object.transactions;

import com.google.common.collect.ImmutableSet;

import java.util.Set;

import lombok.Getter;

import org.corfudb.protocols.logprotocol.SMREntry;
import org.corfudb.protocols.wireprotocol.TxResolutionInfo;
import org.corfudb.runtime.exceptions.AbortCause;
Expand All @@ -10,16 +14,14 @@
import org.corfudb.runtime.object.ICorfuSMRAccess;
import org.corfudb.runtime.object.ICorfuSMRProxyInternal;

import java.util.*;

/**
* A snapshot transactional context.
*
* Given the snapshot (log address) given by the TransactionBuilder,
* <p>Given the snapshot (log address) given by the TransactionBuilder,
* access all objects within the same snapshot during the course of
* this transactional context.
*
* Created by mwei on 11/22/16.
* <p>Created by mwei on 11/22/16.
*/
public class SnapshotTransactionalContext extends AbstractTransactionalContext {

Expand All @@ -44,7 +46,8 @@ public <R, T> R access(ICorfuSMRProxyInternal<T> proxy,
// In snapshot transactions, there are no conflicts.
// Hence, we do not need to add this access to a conflict set
// do not add: addToReadSet(proxy, conflictObject);
return proxy.getUnderlyingObject().access(o -> o.getVersionUnsafe() == getSnapshotTimestamp()
return proxy.getUnderlyingObject().access(o -> o.getVersionUnsafe()
== getSnapshotTimestamp()
&& !o.isOptimisticallyModifiedUnsafe(),
o -> {
try {
Expand Down Expand Up @@ -89,7 +92,8 @@ public <T> Object getUpcallResult(ICorfuSMRProxyInternal<T> proxy,
public <T> long logUpdate(ICorfuSMRProxyInternal<T> proxy,
SMREntry updateEntry,
Object[] conflictObject) {
throw new UnsupportedOperationException("Can't modify object during a read-only transaction!");
throw new UnsupportedOperationException(
"Can't modify object during a read-only transaction!");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

/** Helper class to build transactional contexts.
*
* Created by mwei on 11/21/16.
* <p>Created by mwei on 11/21/16.
*/
@Accessors(chain = true)
@Setter
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package org.corfudb.runtime.object.transactions;

import lombok.RequiredArgsConstructor;

import java.util.function.Function;

import lombok.RequiredArgsConstructor;

/**
* Created by mwei on 11/21/16.
*/
Expand Down

0 comments on commit 4516f5f

Please sign in to comment.