Skip to content
Browse files

upgrade to thrift 0.6.1

  • Loading branch information...
1 parent e4ec639 commit dd77e29b464a43abca0aefe8c095d465f53a28c8 @ewhauser committed May 13, 2011
View
6 3rdparty/BUILD
@@ -85,12 +85,12 @@ jar_library('spy-memcached', jar(org = 'spy', name = 'memcached', rev = '2.4.2')
jar_library('stringtemplate',
jar(org = 'org.antlr', name = 'stringtemplate', rev = '3.2.1').withSources())
-jar_library('thrift-0.5',
- jar(org = 'org.apache.thrift', name = 'libthrift', rev = '0.5.0'),
+jar_library('thrift-0.6.1',
+ jar(org = 'org.apache.thrift', name = 'libthrift', rev = '0.6.1'),
pants(':slf4j-api'),
pants(':slf4j-jdk14'), # need a concrete slf4j backend at runtime
)
-jar_library('thrift', pants(':thrift-0.5'))
+jar_library('thrift', pants(':thrift-0.6.1'))
jar_library('twitter4j',
jar(org = 'org.twitter4j', name = 'twitter4j-core', rev = '2.1.6-patched-1').intransitive())
View
49 src/java/com/twitter/common/thrift/ThriftServer.java
@@ -37,14 +37,9 @@
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.THsHaServer;
-import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TNonblockingServerSocket;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.thrift.transport.TTransportFactory;
+import org.apache.thrift.transport.*;
import java.io.IOException;
import java.lang.reflect.Field;
@@ -80,12 +75,6 @@
public static final ExceptionalFunction<ServerSetup, TServer, TTransportException>
THREADPOOL_SERVER = new ExceptionalFunction<ServerSetup, TServer, TTransportException>() {
@Override public TServer apply(ServerSetup setup) throws TTransportException {
- TThreadPoolServer.Options options = new TThreadPoolServer.Options();
- if (setup.getNumThreads() > 0) {
- options.minWorkerThreads = setup.getNumThreads();
- options.maxWorkerThreads = setup.getNumThreads();
- }
-
try {
setup.setSocket(new ServerSocket(setup.getPort()));
} catch (IOException e) {
@@ -111,8 +100,19 @@
}
TServerSocket socket = setup.isMonitored() ? monitoredSocket : unmonitoredSocket;
- return new TThreadPoolServer(processor, socket, transportFactory, transportFactory,
- setup.getProtoFactory(), setup.getProtoFactory(), options);
+ TThreadPoolServer.Args options = new TThreadPoolServer.Args(socket)
+ .processor(processor)
+ .transportFactory(transportFactory)
+ .inputTransportFactory(transportFactory)
+ .protocolFactory(setup.getProtoFactory())
+ .inputProtocolFactory(setup.getProtoFactory());
+
+ if (setup.getNumThreads() > 0) {
+ options.minWorkerThreads(setup.getNumThreads())
+ .maxWorkerThreads(setup.getNumThreads());
+ }
+
+ return new TThreadPoolServer(options);
}
};
public static final ExceptionalFunction<ServerSetup, TServer, TTransportException>
@@ -126,17 +126,23 @@
setup.setSocket(getServerSocketFor(socket));
// just to grab defaults
- THsHaServer.Options options = new THsHaServer.Options();
+ THsHaServer.Args options = new THsHaServer.Args(socket);
+
if (setup.getNumThreads() > 0) {
- options.workerThreads = setup.getNumThreads();
+ options.workerThreads(setup.getNumThreads());
}
// default queue size to num threads: max response time becomes double avg service time
final BlockingQueue<Runnable> queue =
new ArrayBlockingQueue<Runnable>(setup.getQueueSize() > 0 ? setup.getQueueSize()
- : options.workerThreads);
- final ThreadPoolExecutor invoker = new ThreadPoolExecutor(options.workerThreads,
- options.workerThreads, options.stopTimeoutVal, options.stopTimeoutUnit, queue);
+ : options.getWorkerThreads());
+ final ThreadPoolExecutor invoker = new ThreadPoolExecutor(options.getWorkerThreads(),
+ options.getWorkerThreads(), options.getStopTimeoutVal(), options.getStopTimeoutUnit(), queue);
+
+ options.processorFactory(new TProcessorFactory(setup.getProcessor()))
+ .transportFactory(new TFramedTransport.Factory())
+ .protocolFactory(setup.getProtoFactory())
+ .executorService(invoker);
final String serverName = (setup.getName() != null ? setup.getName() : "no-name");
Stats.export(new StatImpl<Integer>(serverName + "_thrift_server_active_threads") {
@@ -146,10 +152,7 @@
@Override public Integer read() { return queue.size(); }
});
- return new THsHaServer(new TProcessorFactory(setup.getProcessor()), socket,
- new TFramedTransport.Factory(),
- setup.getProtoFactory(), setup.getProtoFactory(), invoker,
- new TNonblockingServer.Options());
+ return new THsHaServer(options);
}
};
View
14 src/java/com/twitter/common/thrift/callers/Caller.java
@@ -40,10 +40,10 @@
* @param callback The callback to use if the method is asynchronous.
* @param connectTimeoutOverride Optional override for the default connection timeout.
* @return The return value from invoking the method.
- * @throws Throwable Exception, as prescribed by the method's contract.
+ * @throws Exception Exception, as prescribed by the method's contract.
*/
public Object call(Method method, Object[] args, @Nullable AsyncMethodCallback callback,
- @Nullable Amount<Long, Time> connectTimeoutOverride) throws Throwable;
+ @Nullable Amount<Long, Time> connectTimeoutOverride) throws Exception;
/**
* Captures the result of a request, whether synchronous or asynchronous. It should be expected
@@ -58,11 +58,11 @@ public Object call(Method method, Object[] args, @Nullable AsyncMethodCallback c
/**
* Called when the request failed.
*
- * @param t Throwable that was caught. Must never be null.
+ * @param e Exception that was caught. Must never be null.
* @return {@code true} if a wrapped callback should be notified of the failure,
* {@code false} otherwise.
*/
- boolean fail(Throwable t);
+ boolean fail(Exception e);
}
/**
@@ -92,9 +92,9 @@ private void callbackTriggered() {
callbackTriggered();
}
- @Override public void onError(Throwable t) {
- if (capture.fail(t)) {
- wrapped.onError(t);
+ @Override public void onError(Exception e) {
+ if (capture.fail(e)) {
+ wrapped.onError(e);
callbackTriggered();
}
}
View
10 src/java/com/twitter/common/thrift/callers/CallerDecorator.java
@@ -51,7 +51,7 @@
*/
protected final Object invoke(Method method, Object[] args,
@Nullable AsyncMethodCallback callback, @Nullable final ResultCapture capture,
- @Nullable Amount<Long, Time> connectTimeoutOverride) throws Throwable {
+ @Nullable Amount<Long, Time> connectTimeoutOverride) throws Exception {
// Swap the wrapped callback out for ours.
if (callback != null) {
@@ -63,14 +63,14 @@ protected final Object invoke(Method method, Object[] args,
if (callback == null && capture != null) capture.success();
return result;
- } catch (Throwable t) {
+ } catch (Exception e) {
// We allow this one to go to both sync and async captures.
if (callback != null) {
- callback.onError(t);
+ callback.onError(e);
return null;
} else {
- if (capture != null) capture.fail(t);
- throw t;
+ if (capture != null) capture.fail(e);
+ throw e;
}
}
}
View
4 src/java/com/twitter/common/thrift/callers/DeadlineCaller.java
@@ -63,7 +63,7 @@ public DeadlineCaller(Caller decoratedCaller, boolean async, ExecutorService exe
@Override
public Object call(final Method method, final Object[] args,
@Nullable final AsyncMethodCallback callback,
- @Nullable final Amount<Long, Time> connectTimeoutOverride) throws Throwable {
+ @Nullable final Amount<Long, Time> connectTimeoutOverride) throws Exception {
try {
Future<Object> result = executorService.submit(new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -87,7 +87,7 @@ public Object call(final Method method, final Object[] args,
} catch (RejectedExecutionException e) {
throw new TResourceExhaustedException(e);
} catch (InvocationTargetException e) {
- throw e.getCause();
+ throw Throwables.throwCause(e, false);
}
}
}
View
12 src/java/com/twitter/common/thrift/callers/DebugCaller.java
@@ -17,13 +17,11 @@
package com.twitter.common.thrift.callers;
import com.google.common.base.Joiner;
-import com.google.common.base.Throwables;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import org.apache.thrift.async.AsyncMethodCallback;
import javax.annotation.Nullable;
-import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.logging.Logger;
@@ -49,13 +47,13 @@ public DebugCaller(Caller decoratedCaller, boolean async) {
@Override
public Object call(final Method method, final Object[] args,
@Nullable AsyncMethodCallback callback, @Nullable Amount<Long, Time> connectTimeoutOverride)
- throws Throwable {
+ throws Exception {
ResultCapture capture = new ResultCapture() {
@Override public void success() {
// No-op.
}
- @Override public boolean fail(Throwable t) {
+ @Override public boolean fail(Exception e) {
StringBuilder message = new StringBuilder("Thrift call failed: ");
message.append(method.getName()).append("(");
ARG_JOINER.appendTo(message, args);
@@ -68,9 +66,9 @@ public Object call(final Method method, final Object[] args,
try {
return invoke(method, args, callback, capture, connectTimeoutOverride);
- } catch (Throwable t) {
- capture.fail(t);
- throw t;
+ } catch (Exception e) {
+ capture.fail(e);
+ throw e;
}
}
}
View
30 src/java/com/twitter/common/thrift/callers/RetryingCaller.java
@@ -90,44 +90,44 @@ public RetryingCaller(Caller decoratedCall, boolean async, StatsProvider statsPr
@Override public Object call(final Method method, final Object[] args,
@Nullable final AsyncMethodCallback callback,
- @Nullable final Amount<Long, Time> connectTimeoutOverride) throws Throwable {
+ @Nullable final Amount<Long, Time> connectTimeoutOverride) throws Exception {
final AtomicLong retryCounter = stats.get(method);
final AtomicInteger attempts = new AtomicInteger();
- final List<Throwable> exceptions = Lists.newArrayList();
+ final List<Exception> exceptions = Lists.newArrayList();
final ResultCapture capture = new ResultCapture() {
@Override public void success() {
// No-op.
}
- @Override public boolean fail(Throwable t) {
- if (!isRetryable(t)) {
+ @Override public boolean fail(Exception e) {
+ if (!isRetryable(e)) {
if (debug) {
LOG.warning(String.format(
"Call failed with un-retryable exception of [%s]: %s, previous exceptions: %s",
- t.getClass().getName(), t.getMessage(), combineStackTraces(exceptions)));
+ e.getClass().getName(), e.getMessage(), combineStackTraces(exceptions)));
}
return true;
} else if (attempts.get() >= retries) {
- exceptions.add(t);
+ exceptions.add(e);
if (debug) {
LOG.warning(String.format("Retried %d times, last error: %s, exceptions: %s",
- attempts.get(), t, combineStackTraces(exceptions)));
+ attempts.get(), e, combineStackTraces(exceptions)));
}
return true;
} else {
- exceptions.add(t);
+ exceptions.add(e);
if (isAsync() && attempts.incrementAndGet() <= retries) {
try {
retryCounter.incrementAndGet();
// override connect timeout in ThriftCaller to prevent blocking for a connection
// for async retries (since this is within the callback in the selector thread)
invoke(method, args, callback, this, NONBLOCKING_TIMEOUT);
- } catch (Throwable throwable) {
+ } catch (Exception throwable) {
return fail(throwable);
}
}
@@ -142,9 +142,9 @@ public RetryingCaller(Caller decoratedCall, boolean async, StatsProvider statsPr
try {
// If this is an async call, the looping will be handled within the capture.
return invoke(method, args, callback, capture, connectTimeoutOverride);
- } catch (Throwable t) {
+ } catch (Exception t) {
if (!isRetryable(t)) {
- Throwable propagated = t;
+ Exception propagated = t;
if (!exceptions.isEmpty() && (t instanceof TResourceExhaustedException)) {
// If we've been trucking along through retries that have had remote call failures
@@ -166,7 +166,7 @@ public RetryingCaller(Caller decoratedCall, boolean async, StatsProvider statsPr
if (continueLoop) retryCounter.incrementAndGet();
} while (continueLoop);
- Throwable lastRetriedException = Iterables.getLast(exceptions);
+ Exception lastRetriedException = Iterables.getLast(exceptions);
if (debug) {
if (!exceptions.isEmpty()) {
LOG.warning(
@@ -206,14 +206,14 @@ private boolean isRetryable(final Class<? extends Throwable> exceptionClass) {
private static final Joiner STACK_TRACE_JOINER = Joiner.on('\n');
- private static String combineStackTraces(List<Throwable> exceptions) {
+ private static String combineStackTraces(List<Exception> exceptions) {
if (exceptions.isEmpty()) {
return "none";
} else {
return STACK_TRACE_JOINER.join(Iterables.transform(exceptions,
- new Function<Throwable, String>() {
+ new Function<Exception, String>() {
private int index = 1;
- @Override public String apply(Throwable exception) {
+ @Override public String apply(Exception exception) {
return String.format("[%d] %s",
index++, Throwables.getStackTraceAsString(exception));
}
View
8 src/java/com/twitter/common/thrift/callers/StatTrackingCaller.java
@@ -67,7 +67,7 @@ public StatTrackingCaller(Caller decoratedCaller, boolean async, StatsProvider s
@Override
public Object call(Method method, Object[] args, @Nullable AsyncMethodCallback callback,
- @Nullable Amount<Long, Time> connectTimeoutOverride) throws Throwable {
+ @Nullable Amount<Long, Time> connectTimeoutOverride) throws Exception {
final StatsProvider.RequestTimer requestStats = stats.get(method);
final long startTime = System.nanoTime();
@@ -77,17 +77,17 @@ public Object call(Method method, Object[] args, @Nullable AsyncMethodCallback c
System.nanoTime() - startTime));
}
- @Override public boolean fail(Throwable t) {
+ @Override public boolean fail(Exception e) {
// TODO(John Sirois): the ruby client reconnects for timeouts too - this provides a natural
// backoff mechanism - consider how to plumb something similar.
- if (t instanceof TTimeoutException || t instanceof TimeoutException) {
+ if (e instanceof TTimeoutException || e instanceof TimeoutException) {
requestStats.incTimeouts();
return true;
}
// TODO(John Sirois): consider ditching reconnects since its nearly redundant with errors as
// it stands.
- if (!(t instanceof TResourceExhaustedException)) {
+ if (!(e instanceof TResourceExhaustedException)) {
requestStats.incReconnects();
}
// TODO(John Sirois): provide more detailed stats: track counts for distinct exceptions types,
View
22 src/java/com/twitter/common/thrift/callers/ThriftCaller.java
@@ -17,15 +17,16 @@
package com.twitter.common.thrift.callers;
import com.google.common.base.Function;
+import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
+import com.twitter.common.net.loadbalancing.RequestTracker;
import com.twitter.common.net.pool.Connection;
import com.twitter.common.net.pool.ObjectPool;
+import com.twitter.common.net.pool.ResourceExhaustedException;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
-import com.twitter.common.net.pool.ResourceExhaustedException;
import com.twitter.common.thrift.TResourceExhaustedException;
import com.twitter.common.thrift.TTimeoutException;
-import com.twitter.common.net.loadbalancing.RequestTracker;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.transport.TTransport;
@@ -73,7 +74,7 @@ public ThriftCaller(ObjectPool<Connection<TTransport, InetSocketAddress>> connec
@Override
public Object call(Method method, Object[] args, @Nullable AsyncMethodCallback callback,
- @Nullable Amount<Long, Time> connectTimeoutOverride) throws Throwable {
+ @Nullable Amount<Long, Time> connectTimeoutOverride) throws Exception {
final Connection<TTransport, InetSocketAddress> connection = getConnection(connectTimeoutOverride);
final long startNanos = System.nanoTime();
@@ -88,9 +89,9 @@ public Object call(Method method, Object[] args, @Nullable AsyncMethodCallback c
}
}
- @Override public boolean fail(Throwable t) {
+ @Override public boolean fail(Exception e) {
if (debug) {
- LOG.warning(String.format("Call to endpoint: %s failed: %s", connection, t));
+ LOG.warning(String.format("Call to endpoint: %s failed: %s", connection, e));
}
try {
@@ -107,7 +108,7 @@ public Object call(Method method, Object[] args, @Nullable AsyncMethodCallback c
}
private static Object invokeMethod(Object target, Method method, Object[] args,
- AsyncMethodCallback callback, final ResultCapture capture) throws Throwable {
+ AsyncMethodCallback callback, final ResultCapture capture) throws Exception {
// Swap the wrapped callback out for ours.
if (callback != null) {
@@ -123,14 +124,15 @@ private static Object invokeMethod(Object target, Method method, Object[] args,
if (callback == null) capture.success();
return result;
- } catch (InvocationTargetException t) {
+ } catch (InvocationTargetException e) {
// We allow this one to go to both sync and async captures.
+ Exception cause = (Exception)e.getCause();
if (callback != null) {
- callback.onError(t.getCause());
+ callback.onError(cause);
return null;
} else {
- capture.fail(t.getCause());
- throw t.getCause();
+ capture.fail(cause);
+ throw Throwables.throwCause(e, false);
}
}
}
View
4 tests/java/com/twitter/common/thrift/ThriftFactoryTest.java
@@ -214,8 +214,8 @@ public void onComplete(String response) {
}
@Override
- public void onError(Throwable throwable) {
- responseHolder[0] = throwable.toString();
+ public void onError(Exception exception) {
+ responseHolder[0] = exception.toString();
done.countDown();
}
};
View
4 tests/java/com/twitter/common/thrift/ThriftTest.java
@@ -694,7 +694,7 @@ public void testAsyncResourceExhausted() throws Exception {
expectConnectionPoolResourceExhausted(ASYNC_CONNECT_TIMEOUT);
Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService());
- callback.onError((Throwable) and(anyObject(), isA(TResourceExhaustedException.class)));
+ callback.onError((Exception) and(anyObject(), isA(TResourceExhaustedException.class)));
control.replay();
@@ -713,7 +713,7 @@ public void testAsyncDoesNotRetryResourceExhausted() throws Exception {
Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService());
- callback.onError((Throwable) and(anyObject(), isA(TResourceExhaustedException.class)));
+ callback.onError((Exception) and(anyObject(), isA(TResourceExhaustedException.class)));
control.replay();
View
2 tests/java/com/twitter/common/thrift/callers/DeadlineCallerTest.java
@@ -60,7 +60,7 @@ private DeadlineCaller makeDeadline(final boolean shouldTimeOut) {
Caller sleepyCaller = new CallerDecorator(caller, false) {
@Override public Object call(Method method, Object[] args,
@Nullable AsyncMethodCallback callback,
- @Nullable Amount<Long, Time> connectTimeoutOverride) throws Throwable {
+ @Nullable Amount<Long, Time> connectTimeoutOverride) throws Exception {
if (shouldTimeOut) {
try {

0 comments on commit dd77e29

Please sign in to comment.
Something went wrong with that request. Please try again.