Skip to content
This repository has been archived by the owner on Feb 8, 2019. It is now read-only.

Commit

Permalink
S4-7 Guaranteed, ordered, robust TCP protocol - send() queues the mes…
Browse files Browse the repository at this point in the history
…sages - an asynchronous thread pool sends it using Netty - on success, the delivered message is notified - added close() method to Emitter and Listener - added tests to check the above-mentioned properties
  • Loading branch information
Karthik Kambatla authored and matthieumorel committed Jan 31, 2012
1 parent 5e38aa2 commit 8e2427b
Show file tree
Hide file tree
Showing 27 changed files with 865 additions and 582 deletions.
18 changes: 11 additions & 7 deletions subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
@@ -1,13 +1,17 @@
package org.apache.s4.base;

public interface Emitter {

/*
* @param partitionId - destination partition
* @param message - message payload that needs to be sent
* @return - true - if message is sent across successfully
* - false - if send fails
*/

/*
* @param partitionId - destination partition
*
* @param message - message payload that needs to be sent
*
* @return - true - if message is sent across successfully - false - if send fails
*/
boolean send(int partitionId, byte[] message);

int getPartitionCount();

void close();
}
20 changes: 10 additions & 10 deletions subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java
Expand Up @@ -7,15 +7,15 @@
*/
public interface Listener {

/*
* Perform blocking receive on the appropriate communication channel
*
* @return
* <ul><li> byte[] message returned by the channel </li>
* <li> null if the associated blocking thread is interrupted </li>
* </ul>
*/
byte[] recv();
/*
* Perform blocking receive on the appropriate communication channel
*
* @return <ul><li> byte[] message returned by the channel </li> <li> null if the associated blocking thread is
* interrupted </li> </ul>
*/
byte[] recv();

public int getPartitionId();
public int getPartitionId();

void close();
}
Expand Up @@ -9,92 +9,95 @@
import com.google.inject.name.Named;

public class QueueingEmitter implements Emitter, Runnable {
private Emitter emitter;
private BlockingQueue<MessageHolder> queue;
private long dropCount = 0;
private volatile Thread thread;

@Inject
public QueueingEmitter(@Named("ll") Emitter emitter,
@Named("comm.queue_emmiter_size") int queueSize) {
this.emitter = emitter;
queue = new LinkedBlockingQueue<MessageHolder>(queueSize);
}

public long getDropCount() {
return dropCount;
}

public void start() {
if (thread != null) {
throw new IllegalStateException(
"QueueingEmitter is already started");
}
thread = new Thread(this, "QueueingEmitter");
thread.start();
}

public void stop() {
if (thread == null) {
throw new IllegalStateException(
"QueueingEmitter is already stopped");
}
thread.interrupt();
thread = null;
}

@Override
public boolean send(int partitionId, byte[] message) {
MessageHolder mh = new MessageHolder(partitionId, message);
if (!queue.offer(mh)) {
dropCount++;
return true;
} else {
return false;
}
}

public void run() {
while (!Thread.interrupted()) {
try {
MessageHolder mh = queue.take();
// System.out.println("QueueingEmitter: Sending message on low-level emitter");
emitter.send(mh.getPartitionId(), mh.getMessage());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}

public int getPartitionCount() {
return emitter.getPartitionCount();
}

class MessageHolder {
private int partitionId;
private byte[] message;

public int getPartitionId() {
return partitionId;
}

public void setPartitionId(int partitionId) {
this.partitionId = partitionId;
}

public byte[] getMessage() {
return message;
}

public void setMessage(byte[] message) {
this.message = message;
}

public MessageHolder(int partitionId, byte[] message) {
super();
this.partitionId = partitionId;
this.message = message;
}
}
private Emitter emitter;
private BlockingQueue<MessageHolder> queue;
private long dropCount = 0;
private volatile Thread thread;

@Inject
public QueueingEmitter(@Named("ll") Emitter emitter, @Named("comm.queue_emmiter_size") int queueSize) {
this.emitter = emitter;
queue = new LinkedBlockingQueue<MessageHolder>(queueSize);
}

public long getDropCount() {
return dropCount;
}

public void start() {
if (thread != null) {
throw new IllegalStateException("QueueingEmitter is already started");
}
thread = new Thread(this, "QueueingEmitter");
thread.start();
}

public void stop() {
if (thread == null) {
throw new IllegalStateException("QueueingEmitter is already stopped");
}
thread.interrupt();
thread = null;
}

@Override
public boolean send(int partitionId, byte[] message) {
MessageHolder mh = new MessageHolder(partitionId, message);
if (!queue.offer(mh)) {
dropCount++;
return true;
} else {
return false;
}
}

public void run() {
while (!Thread.interrupted()) {
try {
MessageHolder mh = queue.take();
// System.out.println("QueueingEmitter: Sending message on low-level emitter");
emitter.send(mh.getPartitionId(), mh.getMessage());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}

public int getPartitionCount() {
return emitter.getPartitionCount();
}

class MessageHolder {
private int partitionId;
private byte[] message;

public int getPartitionId() {
return partitionId;
}

public void setPartitionId(int partitionId) {
this.partitionId = partitionId;
}

public byte[] getMessage() {
return message;
}

public void setMessage(byte[] message) {
this.message = message;
}

public MessageHolder(int partitionId, byte[] message) {
super();
this.partitionId = partitionId;
this.message = message;
}
}

@Override
public void close() {
// TODO Auto-generated method stub

}

}
@@ -1,6 +1,5 @@
package org.apache.s4.comm;


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

Expand All @@ -16,8 +15,7 @@ public class QueueingListener implements Listener, Runnable {
private volatile Thread thread;

@Inject
public QueueingListener(@Named("ll") Listener listener,
@Named("comm.queue_listener_size") int queueSize) {
public QueueingListener(@Named("ll") Listener listener, @Named("comm.queue_listener_size") int queueSize) {
this.listener = listener;
queue = new LinkedBlockingQueue<byte[]>(queueSize);
}
Expand All @@ -28,17 +26,15 @@ public long getDropCount() {

public void start() {
if (thread != null) {
throw new IllegalStateException(
"QueueingListener is already started");
throw new IllegalStateException("QueueingListener is already started");
}
thread = new Thread(this, "QueueingListener");
thread.start();
}

public void stop() {
if (thread == null) {
throw new IllegalStateException(
"QueueingListener is already stopped");
throw new IllegalStateException("QueueingListener is already stopped");
}
thread.interrupt();
thread = null;
Expand Down Expand Up @@ -69,4 +65,10 @@ public void run() {
}
}
}

@Override
public void close() {
// TODO Auto-generated method stub

}
}
Expand Up @@ -4,11 +4,11 @@

public class LoopBackEmitter implements Emitter {
private LoopBackListener listener;

public LoopBackEmitter(LoopBackListener listener) {
this.listener = listener;
}

@Override
public boolean send(int partitionId, byte[] message) {
listener.put(message);
Expand All @@ -20,4 +20,10 @@ public int getPartitionCount() {
return 1;
}

@Override
public void close() {
// TODO Auto-generated method stub

}

}
@@ -1,19 +1,18 @@
package org.apache.s4.comm.loopback;


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;

import org.apache.s4.base.Listener;

public class LoopBackListener implements Listener {

private BlockingQueue<byte[]> handoffQueue = new SynchronousQueue<byte[]>();

@Override
public byte[] recv() {
try {
//System.out.println("LoopBackListener: Taking message from handoff queue");
// System.out.println("LoopBackListener: Taking message from handoff queue");
return handoffQueue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
Expand All @@ -24,14 +23,20 @@ public byte[] recv() {
public int getPartitionId() {
return 0;
}

public void put(byte[] message) {
try {
//System.out.println("LoopBackListener: putting message into handoffqueue");
// System.out.println("LoopBackListener: putting message into handoffqueue");
handoffQueue.put(message);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}

@Override
public void close() {
// TODO Auto-generated method stub

}

}

0 comments on commit 8e2427b

Please sign in to comment.