Skip to content

Commit

Permalink
Take and recover aborted snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
dgomezferro committed Apr 26, 2012
1 parent 3084da1 commit 6488a53
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 25 deletions.
51 changes: 51 additions & 0 deletions src/main/java/com/yahoo/omid/tso/AbortedTransaction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.yahoo.omid.tso;

public class AbortedTransaction {
private long startTimestamp;
private long snapshot;

public AbortedTransaction(long startTimestamp, long snapshot) {
super();
this.startTimestamp = startTimestamp;
this.snapshot = snapshot;
}

public long getStartTimestamp() {
return startTimestamp;
}

public void setStartTimestamp(long startTimestamp) {
this.startTimestamp = startTimestamp;
}

public long getSnapshot() {
return snapshot;
}

public void setSnapshot(long snapshot) {
this.snapshot = snapshot;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (startTimestamp ^ (startTimestamp >>> 32));
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
AbortedTransaction other = (AbortedTransaction) obj;
if (startTimestamp != other.startTimestamp)
return false;
return true;
}

}
15 changes: 11 additions & 4 deletions src/main/java/com/yahoo/omid/tso/CommitHashMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import org.jboss.netty.util.internal.ConcurrentHashMap;

Expand Down Expand Up @@ -143,21 +144,27 @@ public CommitHashMap(int initialCapacity, float loadFactor) {

// set of half aborted transactions
// TODO: set the initial capacity in a smarter way
Set<Long> halfAborted = Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>(10000));
Set<AbortedTransaction> halfAborted = Collections.newSetFromMap(new ConcurrentHashMap<AbortedTransaction, Boolean>(10000));

private AtomicLong abortedSnapshot = new AtomicLong();

long getAndIncrementAbortedSnapshot() {
return abortedSnapshot.getAndIncrement();
}

// add a new half aborted transaction
void setHalfAborted(long startTimestamp) {
halfAborted.add(startTimestamp);
halfAborted.add(new AbortedTransaction(startTimestamp, abortedSnapshot.get()));
}

// call when a half aborted transaction is fully aborted
void setFullAborted(long startTimestamp) {
halfAborted.remove(startTimestamp);
halfAborted.remove(new AbortedTransaction(startTimestamp, 0));
}

// query to see if a transaction is half aborted
boolean isHalfAborted(long startTimestamp) {
return halfAborted.contains(startTimestamp);
return halfAborted.contains(new AbortedTransaction(startTimestamp, 0));
}
}

Expand Down
73 changes: 59 additions & 14 deletions src/main/java/com/yahoo/omid/tso/TSOHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.buffer.ChannelBuffer;
Expand All @@ -44,7 +43,6 @@
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;

import com.yahoo.omid.tso.TSOSharedMessageBuffer.ReadingBuffer;
import com.yahoo.omid.tso.messages.AbortRequest;
Expand Down Expand Up @@ -98,9 +96,11 @@ public class TSOHandler extends SimpleChannelHandler {
private TSOState sharedState;

private FlushThread flushThread;
private ScheduledExecutorService executor;
private ScheduledExecutorService scheduledExecutor;
private ScheduledFuture<?> flushFuture;

private ExecutorService executor;

/**
* Constructor
* @param channelGroup
Expand All @@ -109,8 +109,11 @@ public TSOHandler(ChannelGroup channelGroup, TSOState state) {
this.channelGroup = channelGroup;
this.timestampOracle = state.getSO();
this.sharedState = state;
}

public void start() {
this.flushThread = new FlushThread();
this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(Thread.currentThread().getThreadGroup(), r);
Expand All @@ -119,7 +122,8 @@ public Thread newThread(Runnable r) {
return t;
}
});
this.flushFuture = executor.schedule(flushThread, TSOState.FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
this.flushFuture = scheduledExecutor.schedule(flushThread, TSOState.FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
this.executor = Executors.newSingleThreadExecutor();
}

/**
Expand Down Expand Up @@ -218,8 +222,8 @@ public void handle(TimestampRequest msg, ChannelHandlerContext ctx) {
}
}
if (bootstrap) {
for (Long halfAborted : sharedState.hashmap.halfAborted) {
channel.write(new AbortedTransactionReport(halfAborted));
for (AbortedTransaction halfAborted : sharedState.hashmap.halfAborted) {
channel.write(new AbortedTransactionReport(halfAborted.getStartTimestamp()));
}
}
synchronized (sharedMsgBufLock) {
Expand All @@ -239,6 +243,43 @@ public void handle(TimestampRequest msg, ChannelHandlerContext ctx) {

private Object sharedMsgBufLock = new Object();
private Object callbackLock = new Object();
private AddRecordCallback noCallback = new AddRecordCallback() {
@Override
public void addRecordComplete(int rc, Object ctx) {
}
};

private Runnable createAbortedSnaphostTask = new Runnable() {
@Override
public void run() {
createAbortedSnapshot();
}
};

public void createAbortedSnapshot() {

ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream toWAL = new DataOutputStream(baos);

long snapshot = sharedState.hashmap.getAndIncrementAbortedSnapshot();

try {
toWAL.writeByte(LoggerProtocol.SNAPSHOT);
toWAL.writeLong(snapshot);
for (AbortedTransaction aborted : sharedState.hashmap.halfAborted) {
// ignore aborted transactions from last snapshot
if (aborted.getSnapshot() < snapshot) {
toWAL.writeByte(LoggerProtocol.ABORT);
toWAL.writeLong(aborted.getStartTimestamp());
}
}
} catch (IOException e) {
// can't happen
throw new RuntimeException(e);
}

sharedState.addRecord(baos.toByteArray(), noCallback, null);
}

/**
* Handle the CommitRequest message
Expand Down Expand Up @@ -287,19 +328,19 @@ public void handle(CommitRequest msg, ChannelHandlerContext ctx) {
toWAL.writeByte(LoggerProtocol.COMMIT);
toWAL.writeLong(msg.startTimestamp);
toWAL.writeLong(commitTimestamp);
// toWAL.writeByte(msg.rows.length);

long oldLargestDeletedTimestamp = sharedState.largestDeletedTimestamp.get();

for (RowKey r: msg.rows) {
// toWAL.write(r.getRow(), 0, r.getRow().length);
sharedState.largestDeletedTimestamp.set(sharedState.hashmap.put(r.getRow(),
r.getTable(),
commitTimestamp,
r.hashCode(),
sharedState.largestDeletedTimestamp.get()));
oldLargestDeletedTimestamp));
}

sharedState.processCommit(msg.startTimestamp, commitTimestamp);
if (sharedState.largestDeletedTimestamp.get() > sharedState.previousLargestDeletedTimestamp.get()) {
if (sharedState.largestDeletedTimestamp.get() > oldLargestDeletedTimestamp) {
toWAL.writeByte(LoggerProtocol.LARGESTDELETEDTIMESTAMP);
toWAL.writeLong(sharedState.largestDeletedTimestamp.get());
Set<Long> toAbort = sharedState.uncommited.raiseLargestDeletedTransaction(sharedState.largestDeletedTimestamp.get());
Expand All @@ -310,6 +351,10 @@ public void handle(CommitRequest msg, ChannelHandlerContext ctx) {
}
queueLargestIncrease(sharedState.largestDeletedTimestamp.get());
}
}
if (sharedState.largestDeletedTimestamp.get() > sharedState.previousLargestDeletedTimestamp.get() + TSOState.MAX_COMMITS * 2) {
executor.submit(createAbortedSnaphostTask);
// schedule snapshot
sharedState.previousLargestDeletedTimestamp.set(sharedState.largestDeletedTimestamp.get());
}
synchronized (sharedMsgBufLock) {
Expand Down Expand Up @@ -429,7 +474,7 @@ public void addRecordComplete(int rc, Object ctx) {
sharedState.nextBatch = new ArrayList<ChannelandMessage>(sharedState.nextBatch.size() + 5);
sharedState.baos.reset();
if (flushFuture.cancel(false)) {
flushFuture = executor.schedule(flushThread, TSOState.FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
flushFuture = scheduledExecutor.schedule(flushThread, TSOState.FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
}
}
}
Expand All @@ -450,7 +495,7 @@ public void run() {
}
}
}
flushFuture = executor.schedule(flushThread, TSOState.FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
flushFuture = scheduledExecutor.schedule(flushThread, TSOState.FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/yahoo/omid/tso/TSOServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public void addRecordComplete(int rc, Object ctx) {
System.out.println("PARAM MAX_THREADS: " + maxThreads);

final TSOHandler handler = new TSOHandler(channelGroup, state);
handler.start();

bootstrap.setPipelineFactory(new TSOPipelineFactory(pipelineExecutor, handler));
bootstrap.setOption("tcpNoDelay", false);
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/com/yahoo/omid/tso/TSOState.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,16 @@ public TSOState(StateLogger logger, TimestampOracle timestampOracle) {
this.timestampOracle = timestampOracle;
this.previousLargestDeletedTimestamp.set(this.timestampOracle.get());
this.largestDeletedTimestamp.set(this.previousLargestDeletedTimestamp.get());
this.uncommited = new Uncommited(largestDeletedTimestamp.get());
this.uncommited = new Uncommited(timestampOracle.first());
this.logger = logger;
}

public TSOState(TimestampOracle timestampOracle) {
this(null, timestampOracle);
}
}

public void initialize() {
this.uncommited = new Uncommited(timestampOracle.first());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public static TSOState getState(TSOServerConfig config){
if(!config.isRecoveryEnabled()){
LOG.warn("Logger is disabled");
returnValue = new TSOState(new TimestampOracle());
returnValue.initialize();
} else {
BookKeeperStateBuilder builder = new BookKeeperStateBuilder(config);

Expand Down Expand Up @@ -112,6 +113,7 @@ class Context {
boolean ready = false;
boolean hasState = false;
boolean hasLogger = false;
int pending = 0;
StateLogger logger;

synchronized void setState(TSOState state){
Expand All @@ -137,6 +139,14 @@ synchronized private void validate(){
}
}

synchronized private void incrementPending(){
pending++;
}

synchronized private void decrementPending(){
pending--;
}

synchronized void abort() {
this.ready = true;
this.state = null;
Expand All @@ -146,6 +156,10 @@ synchronized void abort() {
synchronized boolean isReady(){
return ready;
}

synchronized boolean isFinished(){
return ready && pending == 0;
}
}

class LoggerWatcher implements Watcher{
Expand Down Expand Up @@ -221,6 +235,7 @@ public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entri
((BookKeeperStateBuilder.Context) ctx).setState(lp.getState());
}
}
((BookKeeperStateBuilder.Context) ctx).decrementPending();
}
}
}
Expand Down Expand Up @@ -284,7 +299,7 @@ public void loggerInitComplete(int rc, StateLogger sl, Object ctx){

try{
synchronized(ctx){
if(!ctx.isReady()){
if(!ctx.isFinished()){
// TODO make configurable maximum waiting
ctx.wait();
}
Expand Down Expand Up @@ -357,6 +372,7 @@ public void openComplete(int rc, LedgerHandle lh, Object ctx){
break;
}
if (((Context) ctx).isReady()) break;
((Context) ctx).incrementPending();
long nextBatch = Math.max(counter - BKREADBATCHSIZE + 1, 0);
lh.asyncReadEntries(nextBatch, counter, new LoggerExecutor(), ctx);
counter -= BKREADBATCHSIZE;
Expand Down
Loading

0 comments on commit 6488a53

Please sign in to comment.