Skip to content

Commit

Permalink
[OMID-71] Omid's transaction manager detects conflicts at the
Browse files Browse the repository at this point in the history
 cell level. Meaning that two concurrent transactions write to different
 columns at the same row do not conflict. This semantics is not suitable for
 Apache Phoenix which requires conflict detection at the row level. This
 commit augments Omid with row level conflict analysis.
  • Loading branch information
Ohad Shacham committed Jun 28, 2017
1 parent 0c37136 commit 0005b36
Show file tree
Hide file tree
Showing 9 changed files with 291 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
*/
package org.apache.omid.transaction;

import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;

import org.apache.omid.tso.client.CellId;
import org.apache.hadoop.hbase.client.HTableInterface;

Expand Down Expand Up @@ -67,14 +69,25 @@ public String toString() {
+ ":" + timestamp;
}

private Hasher getHasher() {
return Hashing.murmur3_128().newHasher();
}

@Override
public long getCellId() {
return Hashing.murmur3_128().newHasher()
return getHasher()
.putBytes(table.getTableName())
.putBytes(row)
.putBytes(family)
.putBytes(qualifier)
.hash().asLong();
}

@Override
public long getRowId() {
return getHasher()
.putBytes(table.getTableName())
.putBytes(row)
.hash().asLong();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import com.google.inject.name.Named;

import org.apache.omid.YAMLUtils;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.tools.hbase.SecureHBaseConfig;
import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
import org.apache.omid.tso.client.OmidClientConfiguration.PostCommitMode;
import org.apache.omid.tso.client.OmidClientConfiguration;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -73,6 +75,14 @@ public void setPostCommitMode(PostCommitMode postCommitMode) {
omidClientConfiguration.setPostCommitMode(postCommitMode);
}

public ConflictDetectionLevel getConflictAnalysisLevel() {
return omidClientConfiguration.getConflictAnalysisLevel();
}

public void setConflictAnalysisLevel(ConflictDetectionLevel conflictAnalysisLevel) {
omidClientConfiguration.setConflictAnalysisLevel(conflictAnalysisLevel);
}

public String getCommitTableName() {
return commitTableName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@
public interface CellId {

long getCellId();
long getRowId();

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public enum ConnType {DIRECT, HA}

public enum PostCommitMode {SYNC, ASYNC}

public enum ConflictDetectionLevel {CELL, ROW}

// Basic connection related params

private ConnType connectionType = ConnType.DIRECT;
Expand All @@ -51,6 +53,7 @@ public enum PostCommitMode {SYNC, ASYNC}
// Transaction Manager related params

private PostCommitMode postCommitMode = PostCommitMode.SYNC;
private ConflictDetectionLevel conflictAnalysisLevel = ConflictDetectionLevel.CELL;

// ----------------------------------------------------------------------------------------------------------------
// Instantiation
Expand Down Expand Up @@ -174,4 +177,13 @@ public void setPostCommitMode(PostCommitMode postCommitMode) {
this.postCommitMode = postCommitMode;
}

public ConflictDetectionLevel getConflictAnalysisLevel() {
return conflictAnalysisLevel;
}

@Inject(optional = true)
@Named("omid.tm.conflictAnalysisLevel")
public void setConflictAnalysisLevel(ConflictDetectionLevel conflictAnalysisLevel) {
this.conflictAnalysisLevel = conflictAnalysisLevel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.apache.omid.proto.TSOProto;
import org.apache.omid.transaction.TransactionException;
import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
import org.apache.omid.zk.ZKUtils;
import org.apache.statemachine.StateMachine;
import org.apache.curator.framework.CuratorFramework;
Expand Down Expand Up @@ -54,6 +57,7 @@
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
Expand All @@ -63,6 +67,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


/**
* Describes the abstract methods to communicate to the TSO server
*/
Expand Down Expand Up @@ -92,6 +97,10 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
private InetSocketAddress tsoAddr;
private String zkCurrentTsoPath;

// Conflict detection level of the entire system. Can either be Row or Cell level.
private ConflictDetectionLevel conflictDetectionLevel;
private Set<Long> rowLevelWriteSet;

// ----------------------------------------------------------------------------------------------------------------
// Construction
// ----------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -159,6 +168,9 @@ private TSOClient(OmidClientConfiguration omidConf) throws IOException {
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("reuseAddress", true);
bootstrap.setOption("connectTimeoutMillis", 100);

conflictDetectionLevel = omidConf.getConflictAnalysisLevel();
rowLevelWriteSet = new HashSet<Long>();
}

// ----------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -186,8 +198,29 @@ public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells) {
TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
TSOProto.CommitRequest.Builder commitbuilder = TSOProto.CommitRequest.newBuilder();
commitbuilder.setStartTimestamp(transactionId);

rowLevelWriteSet.clear();
for (CellId cell : cells) {
commitbuilder.addCellId(cell.getCellId());
long id;

switch (conflictDetectionLevel) {
case ROW:
id = cell.getRowId();
if (rowLevelWriteSet.contains(id)) {
continue;
} else {
rowLevelWriteSet.add(id);
}
break;
case CELL:
id = cell.getCellId();
break;
default:
id = 0;
assert (false);
}

commitbuilder.addCellId(id);
}
builder.setCommitRequest(commitbuilder.build());
RequestEvent request = new RequestEvent(builder.build(), requestMaxRetries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.util.Set;

import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;

/**
* Defines the protocol used on the client side to abstract communication to the TSO server
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,24 @@
public class DummyCellIdImpl implements CellId {

private final long cellId;
private final long rowId;

public DummyCellIdImpl(long cellId) {
this(cellId, cellId);
}

public DummyCellIdImpl(long cellId, long rowId) {
this.cellId = cellId;
this.rowId = rowId;
}

@Override
public long getCellId() {
return cellId;
}

@Override
public long getRowId() {
return rowId;
}
}
6 changes: 5 additions & 1 deletion transaction-client/src/main/resources/omid-client-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,8 @@ executorThreads: 3

# Configure whether the TM performs the post-commit actions for a tx (update shadow cells and clean commit table entry)
# before returning to the control to the client (SYNC) or in parallel (ASYNC)
postCommitMode: !!org.apache.omid.tso.client.OmidClientConfiguration$PostCommitMode SYNC
postCommitMode: !!org.apache.omid.tso.client.OmidClientConfiguration$PostCommitMode SYNC

# Conflict analysis level
# Can either be cell level or row level. Default is cell level
conflictDetectionLevel: !!org.apache.omid.tso.client.OmidClientConfiguration$ConflictDetectionLevel CELL

0 comments on commit 0005b36

Please sign in to comment.