Skip to content

Commit

Permalink
[OMID-72] This commit adds support for fences over a table.
Browse files Browse the repository at this point in the history
 These fences guarantee that every transaction started before the fence and
 writes to the table the fence was created for will be aborted.  These fences
 are needed for creating a secondary index in Apache Phoenix. The scenario was
 discussed in TEPHRA-157 and PHOENIX-2478. Augmenting Omid with this kind of
 transactions was also discussed in OMID-56.
  • Loading branch information
Ohad Shacham committed Jul 10, 2017
1 parent 60c9e71 commit 0cae2ff
Show file tree
Hide file tree
Showing 24 changed files with 507 additions and 53 deletions.
12 changes: 12 additions & 0 deletions common/src/main/proto/TSOProto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ message Request {
optional TimestampRequest timestampRequest = 1;
optional CommitRequest commitRequest = 2;
optional HandshakeRequest handshakeRequest = 3;
optional FenceRequest fenceRequest = 4;
}

message TimestampRequest {
Expand All @@ -33,18 +34,29 @@ message CommitRequest {
optional int64 startTimestamp = 1;
optional bool isRetry = 2 [default = false];
repeated int64 cellId = 3;
repeated int64 TableId = 4;
}

message FenceRequest {
optional int64 TableId = 1;
}

message Response {
optional TimestampResponse timestampResponse = 1;
optional CommitResponse commitResponse = 2;
optional HandshakeResponse handshakeResponse = 3;
optional FenceResponse fenceResponse = 4;
}

message TimestampResponse {
optional int64 startTimestamp = 1;
}

message FenceResponse {
optional int64 TableId = 1;
optional int64 FenceId = 2;
}

message CommitResponse {
optional bool aborted = 1;
optional int64 startTimestamp = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,6 @@ public String toString() {
+ ":" + timestamp;
}

private Hasher getHasher() {
return Hashing.murmur3_128().newHasher();
}

@Override
public long getCellId() {
return getHasher()
Expand All @@ -83,11 +79,22 @@ public long getCellId() {
.hash().asLong();
}

@Override
public long getTableId() {
return getHasher()
.putBytes(table.getTableName())
.hash().asLong();
}

@Override
public long getRowId() {
return getHasher()
.putBytes(table.getTableName())
.putBytes(row)
.hash().asLong();
}

public static Hasher getHasher() {
return Hashing.murmur3_128().newHasher();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class HBaseTransaction extends AbstractTransaction<HBaseCellId> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.CommitTable.CommitTimestamp;
import org.apache.omid.committable.hbase.HBaseCommitTable;
Expand Down Expand Up @@ -195,6 +197,11 @@ public void preRollback(AbstractTransaction<? extends CellId> transaction) throw
}
}

@Override
public long getHashForTable(byte[] tableName) {
return HBaseCellId.getHasher().putBytes(tableName).hash().asLong();
}

// ----------------------------------------------------------------------------------------------------------------
// HBaseTransactionClient method implementations
// ----------------------------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.Futures;

import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.CommitTable.CommitTimestamp;
import org.apache.omid.metrics.Counter;
Expand Down Expand Up @@ -70,6 +72,7 @@ public interface TransactionFactory<T extends CellId> {
// Metrics
private final Timer startTimestampTimer;
private final Timer commitTimer;
private final Timer fenceTimer;
private final Counter committedTxsCounter;
private final Counter rolledbackTxsCounter;
private final Counter errorTxsCounter;
Expand Down Expand Up @@ -104,6 +107,7 @@ public AbstractTransactionManager(MetricsRegistry metrics,
// Metrics configuration
this.startTimestampTimer = metrics.timer(name("omid", "tm", "hbase", "startTimestamp", "latency"));
this.commitTimer = metrics.timer(name("omid", "tm", "hbase", "commit", "latency"));
this.fenceTimer = metrics.timer(name("omid", "tm", "hbase", "fence", "latency"));
this.committedTxsCounter = metrics.counter(name("omid", "tm", "hbase", "committedTxs"));
this.rolledbackTxsCounter = metrics.counter(name("omid", "tm", "hbase", "rolledbackTxs"));
this.errorTxsCounter = metrics.counter(name("omid", "tm", "hbase", "erroredTxs"));
Expand Down Expand Up @@ -159,6 +163,40 @@ public final Transaction begin() throws TransactionException {
}
}

/**
* Generates hash ID for table name, this hash is later-on sent to the TSO and used for fencing
* @param tableName - the table name
* @return
*/
abstract public long getHashForTable(byte[] tableName);

/**
* @see org.apache.omid.transaction.TransactionManager#fence()
*/
@Override
public final Transaction fence(byte[] tableName) throws TransactionException {
long fenceTimestamp;
long tableID = getHashForTable(tableName); Hashing.murmur3_128().newHasher().putBytes(tableName).hash().asLong();

try {
fenceTimer.start();
try {
fenceTimestamp = tsoClient.getFence(tableID).get();
} finally {
fenceTimer.stop();
}

AbstractTransaction<? extends CellId> tx = transactionFactory.createTransaction(fenceTimestamp, fenceTimestamp, this);

return tx;
} catch (ExecutionException e) {
throw new TransactionException("Could not get fence", e);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new TransactionException("Interrupted creating a fence", ie);
}
}

/**
* Allows transaction manager developers to perform actions after having started a transaction.
* @param transaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,15 @@ public interface TransactionManager extends Closeable {
*/
void rollback(Transaction tx) throws TransactionException;

/**
* Creates a fence
*
* Creates a fence and returns a {@link Transaction} interface implementation that contains the fence information.
*
* @param tableName name of the table that requires a fence
* @return transaction representation contains the fence timestamp as the TransactionId.
* @throws TransactionException in case of any issues
*/
Transaction fence(byte[] tableName) throws TransactionException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
public interface CellId {

long getCellId();
long getRowId();

long getTableId();

long getRowId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@
package org.apache.omid.tso.client;

import com.google.common.util.concurrent.SettableFuture;

import org.apache.omid.committable.CommitTable;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicLong;

class MockTSOClient implements TSOProtocol {
private final AtomicLong timestampGenerator = new AtomicLong();
private static final int CONFLICT_MAP_SIZE = 1_000_000;
private final long[] conflictMap = new long[CONFLICT_MAP_SIZE];
private final Map<Long, Long> fenceMap = new HashMap<Long, Long>();
private final AtomicLong lwm = new AtomicLong();

private final CommitTable.Writer commitTable;
Expand All @@ -45,6 +50,58 @@ public TSOFuture<Long> getNewStartTimestamp() {
}
}

@Override
public TSOFuture<Long> getFence(long tableId) {
synchronized (conflictMap) {
SettableFuture<Long> f = SettableFuture.create();
long fenceTimestamp = timestampGenerator.incrementAndGet();
f.set(fenceTimestamp);
fenceMap.put(tableId, fenceTimestamp);
try {
// Persist the fence by using the fence identifier as both the start and commit timestamp.
commitTable.addCommittedTransaction(fenceTimestamp, fenceTimestamp);
commitTable.flush();
} catch (IOException ioe) {
f.setException(ioe);
}
return new ForwardingTSOFuture<>(f);
}
}

// Checks whether transaction transactionId started before a fence creation of a table transactionId modified.
private boolean hasConflictsWithFences(long transactionId, Set<? extends CellId> cells) {
Set<Long> tableIDs = new HashSet<Long>();
for (CellId c : cells) {
tableIDs.add(c.getTableId());
}

if (! fenceMap.isEmpty()) {
for (long tableId : tableIDs) {
Long fence = fenceMap.get(tableId);
if (fence != null && transactionId < fence) {
return true;
}
if (fence != null && fence < lwm.get()) { // GC
fenceMap.remove(tableId);
}
}
}

return false;
}

// Checks whether transactionId has a write-write conflict with a transaction committed after transactionId.
private boolean hasConflictsWithCommittedTransactions(long transactionId, Set<? extends CellId> cells) {
for (CellId c : cells) {
int index = Math.abs((int) (c.getCellId() % CONFLICT_MAP_SIZE));
if (conflictMap[index] >= transactionId) {
return true;
}
}

return false;
}

@Override
public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells) {
synchronized (conflictMap) {
Expand All @@ -54,16 +111,9 @@ public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells) {
return new ForwardingTSOFuture<>(f);
}

boolean canCommit = true;
for (CellId c : cells) {
int index = Math.abs((int) (c.getCellId() % CONFLICT_MAP_SIZE));
if (conflictMap[index] >= transactionId) {
canCommit = false;
break;
}
}
if (!hasConflictsWithFences(transactionId, cells) &&
!hasConflictsWithCommittedTransactions(transactionId, cells)) {

if (canCommit) {
long commitTimestamp = timestampGenerator.incrementAndGet();
for (CellId c : cells) {
int index = Math.abs((int) (c.getCellId() % CONFLICT_MAP_SIZE));
Expand Down

0 comments on commit 0cae2ff

Please sign in to comment.