Skip to content

Commit

Permalink
Merge pull request #8 from dgomezferro/master
Browse files Browse the repository at this point in the history
Early stop & aborts snapshot
  • Loading branch information
dgomezferro committed May 29, 2012
2 parents 1d7c6e8 + 1ac32f4 commit 9865866
Show file tree
Hide file tree
Showing 24 changed files with 371 additions and 248 deletions.
41 changes: 31 additions & 10 deletions src/main/java/com/yahoo/omid/client/TSOClient.java
Expand Up @@ -32,8 +32,6 @@
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;


import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
Expand All @@ -48,6 +46,8 @@
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.execution.ExecutionHandler; import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor; import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import com.yahoo.omid.tso.Committed; import com.yahoo.omid.tso.Committed;
import com.yahoo.omid.tso.RowKey; import com.yahoo.omid.tso.RowKey;
Expand All @@ -67,7 +67,7 @@
import com.yahoo.omid.tso.serialization.TSOEncoder; import com.yahoo.omid.tso.serialization.TSOEncoder;


public class TSOClient extends SimpleChannelHandler { public class TSOClient extends SimpleChannelHandler {
private static final Log LOG = LogFactory.getLog(TSOClient.class); private static final Logger LOG = LoggerFactory.getLogger(TSOClient.class);


public static long askedTSO = 0; public static long askedTSO = 0;


Expand Down Expand Up @@ -333,8 +333,8 @@ public TSOClient(Configuration conf) throws IOException {


String host = conf.get("tso.host"); String host = conf.get("tso.host");
int port = conf.getInt("tso.port", 1234); int port = conf.getInt("tso.port", 1234);
max_retries = conf.getInt("tso.max_retries", 10); max_retries = conf.getInt("tso.max_retries", 100);
retry_delay_ms = conf.getInt("tso.retry_delay_ms", 3000); retry_delay_ms = conf.getInt("tso.retry_delay_ms", 1000);


if (host == null) { if (host == null) {
throw new IOException("tso.host missing from configuration"); throw new IOException("tso.host missing from configuration");
Expand All @@ -359,7 +359,12 @@ private State connectIfNeeded() throws IOException {
throw e; throw e;
} }
retries++; retries++;
bootstrap.connect(addr); bootstrap.connect(addr).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
LOG.debug("Connection completed. Success: " + future.isSuccess());
}
});
state = State.CONNECTING; state = State.CONNECTING;
return state; return state;
} }
Expand Down Expand Up @@ -422,6 +427,7 @@ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
retries = 0; retries = 0;
} }
clearState(); clearState();
LOG.debug("Channel connected");
Op o = queuedOps.poll();; Op o = queuedOps.poll();;
while (o != null && state == State.CONNECTED) { while (o != null && state == State.CONNECTED) {
o.execute(channel); o.execute(channel);
Expand All @@ -442,8 +448,24 @@ private void clearState() {
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception { throws Exception {
synchronized(state) { synchronized(state) {
LOG.debug("Channel disconnected");
channel = null; channel = null;
state = State.DISCONNECTED; state = State.DISCONNECTED;
for (CreateCallback cb : createCallbacks) {
cb.error(new IOException("Channel Disconnected"));
}
for (CommitCallback cb : commitCallbacks.values()) {
cb.error(new IOException("Channel Disconnected"));
}
for (List<CommitQueryCallback> lcqb : isCommittedCallbacks.values()) {
for (CommitQueryCallback cqb : lcqb) {
cqb.error(new IOException("Channel Disconnected"));
}
}
createCallbacks.clear();
commitCallbacks.clear();
isCommittedCallbacks.clear();
connectIfNeeded();
} }
} }


Expand Down Expand Up @@ -545,15 +567,14 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
public void exceptionCaught(ChannelHandlerContext ctx, public void exceptionCaught(ChannelHandlerContext ctx,
ExceptionEvent e) ExceptionEvent e)
throws Exception { throws Exception {
System.out.println("Unexpected exception " + e.getCause()); LOG.error("Unexpected exception", e.getCause());
e.getCause().printStackTrace();


synchronized(state) { synchronized(state) {


if (state == State.CONNECTING) { if (state == State.CONNECTING) {
state = State.RETRY_CONNECT_WAIT; state = State.RETRY_CONNECT_WAIT;
if (LOG.isTraceEnabled()) { if (LOG.isDebugEnabled()) {
LOG.trace("Retrying connect in " + retry_delay_ms + "ms " + retries); LOG.debug("Retrying connect in " + retry_delay_ms + "ms " + retries);
} }
try { try {
retryTimer.schedule(new TimerTask() { retryTimer.schedule(new TimerTask() {
Expand Down
51 changes: 51 additions & 0 deletions src/main/java/com/yahoo/omid/tso/AbortedTransaction.java
@@ -0,0 +1,51 @@
package com.yahoo.omid.tso;

public class AbortedTransaction {
private long startTimestamp;
private long snapshot;

public AbortedTransaction(long startTimestamp, long snapshot) {
super();
this.startTimestamp = startTimestamp;
this.snapshot = snapshot;
}

public long getStartTimestamp() {
return startTimestamp;
}

public void setStartTimestamp(long startTimestamp) {
this.startTimestamp = startTimestamp;
}

public long getSnapshot() {
return snapshot;
}

public void setSnapshot(long snapshot) {
this.snapshot = snapshot;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (startTimestamp ^ (startTimestamp >>> 32));
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
AbortedTransaction other = (AbortedTransaction) obj;
if (startTimestamp != other.startTimestamp)
return false;
return true;
}

}
2 changes: 1 addition & 1 deletion src/main/java/com/yahoo/omid/tso/Bucket.java
Expand Up @@ -59,7 +59,7 @@ public synchronized Set<Long> abortUncommited(long id) {
return aborted; return aborted;
} }


LOG.debug("Performing scanning..."); LOG.trace("Performing scanning...");


for (int i = transactions.nextClearBit(firstUncommited); i >= 0 for (int i = transactions.nextClearBit(firstUncommited); i >= 0
&& i <= lastCommited; i = transactions.nextClearBit(i + 1)) { && i <= lastCommited; i = transactions.nextClearBit(i + 1)) {
Expand Down
21 changes: 1 addition & 20 deletions src/main/java/com/yahoo/omid/tso/ClientHandler.java
Expand Up @@ -19,7 +19,6 @@
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
Expand All @@ -37,8 +36,6 @@
import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;


import com.yahoo.omid.client.SyncAbortCompleteCallback; import com.yahoo.omid.client.SyncAbortCompleteCallback;
import com.yahoo.omid.client.SyncCommitCallback; import com.yahoo.omid.client.SyncCommitCallback;
Expand Down Expand Up @@ -161,13 +158,9 @@ public ClientHandler(Configuration conf, int nbMessage, int inflight, boolean pa
@Override @Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
super.channelConnected(ctx, e); super.channelConnected(ctx, e);
try {
Thread.sleep(15000);
} catch (InterruptedException e1) {
//ignore
}
startDate = new Date(); startDate = new Date();
channel = e.getChannel(); channel = e.getChannel();
outstandingTransactions = 0;
startTransaction(); startTransaction();
} }


Expand Down Expand Up @@ -269,18 +262,6 @@ private long getSizeAborted() {
return aborted.size() * 8 * 8; return aborted.size() * 8 * 8;
} }


@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
if (e.getCause() instanceof IOException) {
LOG.warn("IOException from downstream.", e.getCause());
} else {
LOG.warn("Unexpected exception from downstream.", e.getCause());
}
// Offer default object
answer.offer(false);
Channels.close(e.getChannel());
}

private java.util.Random rnd; private java.util.Random rnd;


private boolean pauseClient; private boolean pauseClient;
Expand Down
20 changes: 16 additions & 4 deletions src/main/java/com/yahoo/omid/tso/CommitHashMap.java
Expand Up @@ -16,6 +16,12 @@


package com.yahoo.omid.tso; package com.yahoo.omid.tso;


import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import org.jboss.netty.util.internal.ConcurrentHashMap;

/** /**
* A hash map that uses byte[] for the key rather than longs. * A hash map that uses byte[] for the key rather than longs.
* *
Expand Down Expand Up @@ -138,21 +144,27 @@ public CommitHashMap(int initialCapacity, float loadFactor) {


// set of half aborted transactions // set of half aborted transactions
// TODO: set the initial capacity in a smarter way // TODO: set the initial capacity in a smarter way
java.util.HashSet<Long> halfAborted = new java.util.HashSet<Long>(10000); Set<AbortedTransaction> halfAborted = Collections.newSetFromMap(new ConcurrentHashMap<AbortedTransaction, Boolean>(10000));

private AtomicLong abortedSnapshot = new AtomicLong();

long getAndIncrementAbortedSnapshot() {
return abortedSnapshot.getAndIncrement();
}


// add a new half aborted transaction // add a new half aborted transaction
void setHalfAborted(long startTimestamp) { void setHalfAborted(long startTimestamp) {
halfAborted.add(startTimestamp); halfAborted.add(new AbortedTransaction(startTimestamp, abortedSnapshot.get()));
} }


// call when a half aborted transaction is fully aborted // call when a half aborted transaction is fully aborted
void setFullAborted(long startTimestamp) { void setFullAborted(long startTimestamp) {
halfAborted.remove(startTimestamp); halfAborted.remove(new AbortedTransaction(startTimestamp, 0));
} }


// query to see if a transaction is half aborted // query to see if a transaction is half aborted
boolean isHalfAborted(long startTimestamp) { boolean isHalfAborted(long startTimestamp) {
return halfAborted.contains(startTimestamp); return halfAborted.contains(new AbortedTransaction(startTimestamp, 0));
} }
} }


Expand Down

0 comments on commit 9865866

Please sign in to comment.