Skip to content

Commit

Permalink
NOP/REVU cleanup but regardless TESTED
Browse files Browse the repository at this point in the history
  • Loading branch information
Joubin Houshyar committed Jan 26, 2012
1 parent 8837af8 commit f99b380
Showing 1 changed file with 30 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,10 @@ public class ChunkedPipelineConnection
private Thread respHandlerThread;

/** */
// BlockingQueue<PendingRequest> pendingResponseQueue;
// BlockingQueue<PendingRequest> pendingResponseQueue;
BlockingQueue<PendingCPRequest[]> pendingResponseQueue;


/** synchronization object used to serialize request queuing */
// private Object serviceLock = new Object();
private Lock requestlock;

/**
Expand Down Expand Up @@ -141,7 +138,6 @@ public class ChunkedPipelineConnection
/** Chunk Queue of requests in Chunk buffer */
private PendingCPRequest[] chunkqueue;

// private int seqnum;
// ------------------------------------------------------------------------
// Constructor(s)
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -362,6 +358,7 @@ public final Future<Response> queueRequest (Command cmd, byte[]... args)
ctl_word = (idx << 16) | (off | 0x0);

/* ==END=== CRITICAL BLOCK ====== */
/* REVU: both of these exceptions require setting fault on the cached pendings */
} catch (IOException e) {
Log.error("IOException cmd:%s isConnected:%b", cmd.code, isConnected());
this.onConnectionFault(String.format("IOFault (cmd: %s)", cmd.code), true);
Expand Down Expand Up @@ -448,9 +445,6 @@ private static final class Node<E> {
(Node.class, Object.class, "item");

private Node(E x, Node<E> n) { item = x; next = n; }
// boolean casItem (E expected, E update) {
// return itemUpdater.compareAndSet(this, expected, update);
// }

private final E getItem() { return item; }
private final void setItem(E update) { itemUpdater.set(this, update); }
Expand All @@ -463,13 +457,8 @@ private static final class Node<E> {
// --------------------------------------------------------------
private transient volatile Node<E> head = new Node<E>(null, null);
private transient volatile Node<E> tail = head;
// private final transient Lock Lh;
// private final transient Lock Lt;

public Concurrent2LockQueue () {
// Lh = new ReentrantLock(false);
// Lt = new ReentrantLock(false);
}
public Concurrent2LockQueue () {}

// --------------------------------------------------------------
// INTERFACE: BlockingQueue<E>
Expand All @@ -479,13 +468,11 @@ public Concurrent2LockQueue () {
public final boolean offer(E item) {
if(null == item) throw new NullPointerException("item");
Node<E> n = new Node<E>(item, null);
// Lt.lock();
// try{
Node<E> t = tail;

Node<E> t = tail;
t.setNext(n);
tail = n;
// }
// finally { Lt.unlock(); }

return true;
}

Expand All @@ -498,8 +485,7 @@ public E poll(long timeout, TimeUnit unit) throws InterruptedException {
/* (non-Javadoc) @see java.util.Queue#poll() */
public final E poll () {
E hi = null;
// Lh.lock();
// try{

Node<E> h = head;
Node<E> newhead = h.getNext();
if(newhead != null) {
Expand All @@ -508,24 +494,21 @@ public final E poll () {
newhead.setItem(null);
h.setNext(null);
}
// }
// finally { Lh.unlock(); }

return hi;
}

/* (non-Javadoc) @see java.util.Queue#peek() */
@Override final
public E peek () {
E item = null;
// Lh.lock();
// try{

Node<E> h = head;
Node<E> f = h.getNext();
if(f != null) {
item = f.getItem();
}
// }
// finally { Lh.unlock(); }

return item;
}

Expand Down Expand Up @@ -690,18 +673,13 @@ static public byte[] writeRequestToBuffer(final Buffer buffer, final Command cmd
byte[] cmdLenBytes = Convert.toBytes(cmd.bytes.length);
byte[] lineCntBytes = Convert.toBytes(args.length+1);

// calculate the buffer size
// int bsize = calcReqBuffSize(cmd, args);
//
// buffer = new Buffer(bsize);
//
buffer.write(COUNT_BYTE); // 1
buffer.write(lineCntBytes); // lineCntBytes.length()
buffer.write(CRLF); // CRLF.lengt
buffer.write(SIZE_BYTE); // 1
buffer.write(cmdLenBytes); // length
buffer.write(CRLF); // CRLF_LEN
buffer.write(cmd.bytes); //
buffer.write(COUNT_BYTE);
buffer.write(lineCntBytes);
buffer.write(CRLF);
buffer.write(SIZE_BYTE);
buffer.write(cmdLenBytes);
buffer.write(CRLF);
buffer.write(cmd.bytes);
buffer.write(CRLF);

switch (cmd.requestType) {
Expand Down Expand Up @@ -824,6 +802,7 @@ public final class ResponseHandler implements Runnable, Connection.Listener {
// Constructor
// ------------------------------------------------------------------------

// REVU: this is a complete mess -- TODO:
/**
* Adds self to the listeners of the enclosing {@link Connection} instance.
*/
Expand Down Expand Up @@ -866,16 +845,11 @@ public void run () {
try {
PendingCPRequest[] items = null;
items = pendingResponseQueue.take();
// Log.log("dequeued %s len:%d", items, items.length);
for(PendingCPRequest item : items) {
if(item == null) { break; }
pending = item;
// Log.log("PendingResponse for %s", pending.cmd.code);
// pending = pendingResponseQueue.take();
// System.out.format("Got it %s\n", pending.cmd.code);
try {
// Log.log("Waiting for %s", pending.cmd.code);
// TODO: here -- simplify
// TODO: here -- simplify REVU: ?
response = protocol.createResponse(pending.cmd);
response.read(getInputStream());
pending.response = response;
Expand All @@ -886,13 +860,7 @@ public void run () {

}

// this exception handling as of now is basically broken and fairly useless
// really, what we want is making a distinction between bugs and runtime problems
// and in case of connection issues, signal the retry mechanism.
// in the interim, all incoming requests must be rejected (e.g. PipelineReconnecting ...)
// and all remaining pending responses must be set to error.
// major TODO

// TODO: REVU: this in context of both connection and (general) errors.
catch (ProviderException bug){
Log.bug ("ProviderException: " + bug.getMessage());
onResponseHandlerError(bug, pending);
Expand All @@ -909,26 +877,28 @@ public void run () {
break;
}

// redis (1.00) simply shutsdown connection even if pending responses
// are expected, so quit is NOT sent. we simply close connection on this
// end.
// REVU: this should be noted on the API spec : TODO:
/* QUITs are not sent in pipelines to Redis as it immediately will
* drop the connection and there goes the pending stuff. */
if(pending.cmd == Command.QUIT) {
ChunkedPipelineConnection.this.disconnect();
break;
}
}
}
// REVU: why the general catch -- related to coherence issues of the volatile flags
// TODO: review and clean this shit up
// catch (InterruptedException e1) {
catch (Throwable e1) {
Log.log("Pipeline thread interrupted.");
break;
//e1.printStackTrace();
}
}
Log.log("Pipeline <%s> thread for <%s> stopped.", Thread.currentThread().getName(), ChunkedPipelineConnection.this);
alive_flag.compareAndSet(true, false);
}

// REVU: TODO:
final private void stopHandler() {
Log.log("%s stopping handler thread", this);
// work_flag.set(false);
Expand Down Expand Up @@ -957,17 +927,16 @@ final private void shutdownHandler() {
*/
// ------------------------------------------------------------------------

// REVU: clean this shit up. TODO:
/**
* Needs to be hooked up.
* TODO: zood tond foree saree!
*
* @see org.jredis.connector.Connection.Listener#onEvent(org.jredis.connector.Connection.Event)
*/
@Override
public void onEvent (Event event) {
if(event.getSource() != ChunkedPipelineConnection.this) {
Log.bug("event source [%s] is not this pipeline [%s]", event.getSource(), ChunkedPipelineConnection.this);
// BUG: what to do about it?
String msg = String.format("event source [%s] is not this pipeline [%s]", event.getSource(), ChunkedPipelineConnection.this);
Log.bug(msg);
throw new ProviderException(msg);
}
// (new Exception()).printStackTrace();
Log.log("Pipeline.ResponseHandler: onEvent %s source: %s", event.getType().name(), event.getSource());
Expand All @@ -977,8 +946,7 @@ public void onEvent (Event event) {
break;
case DISCONNECTED:
// should be stopped now
//
// break;
break;
case CONNECTING:
// no op
break;
Expand Down

0 comments on commit f99b380

Please sign in to comment.