Permalink
Browse files

FLUME-1974. Thrift compatibility issue with hbase-0.92.

(Hari Shreedharan via Mike Percy)
  • Loading branch information...
1 parent 6ca6168 commit c72a3b1a5a2548f9157fe7913611c4318ce5e64d @mpercy mpercy committed Apr 8, 2013
@@ -29,15 +29,16 @@
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
-import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.thrift.Status;
import org.apache.flume.thrift.ThriftSourceProtocol;
import org.apache.flume.thrift.ThriftFlumeEvent;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadedSelectorServer;
-import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TFastFramedTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +47,6 @@
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
public class ThriftSource extends AbstractSource implements Configurable,
@@ -72,7 +72,7 @@
private int maxThreads = 0;
private SourceCounter sourceCounter;
private TServer server;
- private TNonblockingServerSocket serverTransport;
+ private TServerTransport serverTransport;
private ExecutorService servingExecutor;
@Override
@@ -100,29 +100,27 @@ public void configure(Context context) {
@Override
public void start() {
logger.info("Starting thrift source");
- ExecutorService sourceService;
- ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(
- "Flume Thrift IPC Thread %d").build();
- if (maxThreads == 0) {
- sourceService = Executors.newCachedThreadPool(threadFactory);
- } else {
- sourceService = Executors.newFixedThreadPool(maxThreads, threadFactory);
- }
+ maxThreads = (maxThreads <= 0) ? Integer.MAX_VALUE : maxThreads;
try {
- serverTransport = new TNonblockingServerSocket(new InetSocketAddress
+ serverTransport = new TServerSocket(new InetSocketAddress
(bindAddress, port));
} catch (TTransportException e) {
throw new FlumeException("Failed to start Thrift Source.", e);
}
- server = new TThreadedSelectorServer(
- new TThreadedSelectorServer.Args(serverTransport).protocolFactory(
- new TCompactProtocol.Factory()).processor(
- new ThriftSourceProtocol.Processor<ThriftSourceHandler>(
- new ThriftSourceHandler())).executorService(sourceService));
+
+ TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport);
+ args.protocolFactory(new TCompactProtocol.Factory());
+ args.inputTransportFactory(new TFastFramedTransport.Factory());
+ args.outputTransportFactory(new TFastFramedTransport.Factory());
+ args.processor(new ThriftSourceProtocol.Processor<ThriftSourceHandler>(
+ new ThriftSourceHandler())).maxWorkerThreads(maxThreads);
+
+ server = new TThreadPoolServer(args);
servingExecutor = Executors.newSingleThreadExecutor(new
ThreadFactoryBuilder().setNameFormat("Flume Thrift Source I/O Boss")
.build());
+
/**
* Start serving.
*/
@@ -165,7 +163,6 @@ public void stop() {
"shutdown.");
}
sourceCounter.stop();
- // Thrift will shutdown the executor passed to it.
super.stop();
}
@@ -17,10 +17,9 @@
* under the License.
*/
/**
- * Autogenerated by Thrift Compiler (0.9.0)
+ * Autogenerated by Thrift Compiler (0.7.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- * @generated
*/
package com.cloudera.flume.handlers.thrift;
@@ -17,10 +17,9 @@
* under the License.
*/
/**
- * Autogenerated by Thrift Compiler (0.9.0)
+ * Autogenerated by Thrift Compiler (0.7.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- * @generated
*/
package com.cloudera.flume.handlers.thrift;
Oops, something went wrong.

0 comments on commit c72a3b1

Please sign in to comment.