Skip to content

Commit

Permalink
[split] finagle-kestrel: java grabbyhands example
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusae committed Aug 5, 2011
1 parent 7fa129d commit d03ad93
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 0 deletions.
@@ -1,5 +1,8 @@
package com.twitter.finagle.service

import java.{util => ju}
import scala.collection.JavaConversions._

import com.twitter.util.{
Future, Promise, Try, Return, Throw,
Timer, TimerTask, Time, Duration}
Expand Down Expand Up @@ -102,4 +105,13 @@ object Backoff {

def const(start: Duration) =
Backoff(start)(Function.const(start))

/**
* Convert a {{Stream[Duration]}} into a Java-friendly representation.
*/
def toJava(backoffs: Stream[Duration]): ju.concurrent.Callable[ju.Iterator[Duration]] = {
new ju.concurrent.Callable[ju.Iterator[Duration]] {
def call() = backoffs.toIterator
}
}
}
@@ -0,0 +1,71 @@
package com.twitter.finagle.example.java.kestrel;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Callable;

import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.util.CharsetUtil;

import com.twitter.util.JavaTimer;
import com.twitter.util.Duration;

import com.twitter.finagle.builder.ClientBuilder;
import com.twitter.finagle.ServiceFactory;
import com.twitter.finagle.service.Backoff;

import com.twitter.finagle.kestrel.java.Client;
import com.twitter.finagle.kestrel.MultiReader;
import com.twitter.finagle.kestrel.ReadHandle;
import com.twitter.finagle.kestrel.ReadMessage;
import com.twitter.finagle.kestrel.protocol.Command;
import com.twitter.finagle.kestrel.protocol.Response;
import com.twitter.finagle.kestrel.protocol.Kestrel;

/**
* Demonstrates the use of {{com.twitter.finagel.kestrel.MultiReader}}
* in Java.
*/
public class GrabbyHands {
public static void main(String args[]) {
if (args.length < 2) {
System.err.println("usage: java… QUEUE HOST1 [HOST2 HOST3…]");
System.exit(1);
}

String queueName = args[0];

JavaTimer timer = new JavaTimer();
Callable<Iterator<Duration>> backoffs = Backoff.toJava(
Backoff.exponential(
// 100ms initial backoff
Duration.fromTimeUnit(100, TimeUnit.MILLISECONDS),
// multiplier
2)
// fail after 10 tries
.take(10));

ArrayList<ReadHandle> handles = new ArrayList<ReadHandle>();
for (int i = 1; i < args.length; i++) {
ServiceFactory<Command, Response> factory =
ClientBuilder.safeBuildFactory(ClientBuilder.get()
.codec(Kestrel.get())
.hosts(args[i])
.hostConnectionLimit(1));
System.out.println("k " + args[i]);

Client client = Client.newInstance(factory);
handles.add(client.readReliably(queueName, timer, backoffs));
}

ReadHandle handle = MultiReader.apply(handles.iterator());

while (true) {
ReadMessage m = handle.messages().syncWait();
System.out.println(m.bytes().toString(CharsetUtil.UTF_8));
System.out.println(ChannelBuffers.hexDump(m.bytes()));
m.ack().sync();
}
}
}
@@ -1,5 +1,8 @@
package com.twitter.finagle.kestrel

import scala.collection.JavaConversions._
import _root_.java.{util => ju}

import com.twitter.concurrent.{Offer, Broker}

object AllHandlesDiedException extends Exception
Expand All @@ -15,6 +18,13 @@ object MultiReader {
def apply(clients: Seq[Client], queueName: String): ReadHandle =
apply(clients map { _.readReliably(queueName) })

/**
* A java friendly interface: we use scala's implicit conversions to
* feed in a {{java.util.Iterator<ReadHandle>}}
*/
def apply(handles: ju.Iterator[ReadHandle]): ReadHandle =
apply(handles.toSeq)

def apply(handles: Seq[ReadHandle]): ReadHandle = {
val error = new Broker[Throwable]
val messages = new Broker[ReadMessage]
Expand Down

0 comments on commit d03ad93

Please sign in to comment.