Skip to content

Commit

Permalink
Optimisitic TX RPC Call out of the lock (#1247)
Browse files Browse the repository at this point in the history
To avoid waiting for RPC calls to be executed under the VLO lock,
snapshot timestamp can be obtained right before acquiring the lock.
  • Loading branch information
annym authored and no2chem committed Apr 4, 2018
1 parent 4481693 commit d927655
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 3 deletions.
Expand Up @@ -196,7 +196,7 @@ public <R> R access(Function<VersionLockedObject<T>, Boolean> directAccessCheckF
}
}
} catch (Exception e) {
// If we have an exception, we didn't get a chance to validate the the lock.
// If we have an exception, we didn't get a chance to validate the lock.
// If it's still valid, then we should re-throw the exception since it was
// on a correct view of the object.
if (lock.validate(ts)) {
Expand Down
Expand Up @@ -93,6 +93,8 @@ public <R, T> R access(ICorfuSMRProxyInternal<T> proxy,
// Next, we sync the object, which will bring the object
// to the correct version, reflecting any optimistic
// updates.
// Get snapshot timestamp in advance so it is not performed under the VLO lock
long ts = getSnapshotTimestamp();
return proxy
.getUnderlyingObject()
.access(o -> {
Expand All @@ -101,7 +103,7 @@ public <R, T> R access(ICorfuSMRProxyInternal<T> proxy,
// Obtain the stream position as when transaction context last
// remembered it.
long streamReadPosition = getKnownStreamPosition()
.getOrDefault(proxy.getStreamID(), getSnapshotTimestamp());
.getOrDefault(proxy.getStreamID(), ts);

return (
(stream == null || stream.isStreamCurrentContextThreadCurrentContext())
Expand Down Expand Up @@ -152,9 +154,11 @@ public <T> Object getUpcallResult(ICorfuSMRProxyInternal<T> proxy,
return wrapper.getUpcallResult();
}
// Otherwise, we need to sync the object
// Get snapshot timestamp in advance so it is not performed under the VLO lock
long ts = getSnapshotTimestamp();
return proxy.getUnderlyingObject().update(o -> {
log.trace("Upcall[{}] {} Sync'd", this, timestamp);
syncWithRetryUnsafe(o, getSnapshotTimestamp(), proxy, this::setAsOptimisticStream);
syncWithRetryUnsafe(o, ts, proxy, this::setAsOptimisticStream);
SMREntry wrapper2 = getWriteSetEntryList(proxy.getStreamID()).get((int)timestamp);
if (wrapper2 != null && wrapper2.isHaveUpcallResult()) {
return wrapper2.getUpcallResult();
Expand Down

0 comments on commit d927655

Please sign in to comment.