Skip to content
Browse files

Revert "Add ability to use custom TServerFactory implementations"

This reverts commit 8264eb2.
  • Loading branch information...
1 parent 5b151f3 commit 5b5b20a8787ef004751e8c7ee18a31e8b30d1fab @jbellis jbellis committed Oct 8, 2012
View
1 CHANGES.txt
@@ -19,7 +19,6 @@
* Pluggable Thrift transport factories for CLI (CASSANDRA-4609)
* Backport adding AlterKeyspace statement (CASSANDRA-4611)
* (CQL3) Correcty accept upper-case data types (CASSANDRA-4770)
- * Add ability to use custom TServerFactory implementations (CASSANDRA-4608)
Merged from 1.0:
* Switch from NBHM to CHM in MessagingService's callback map, which
prevents OOM in long-running instances (CASSANDRA-4708)
View
5 conf/cassandra.yaml
@@ -287,7 +287,7 @@ rpc_port: 9160
# enable or disable keepalive on rpc connections
rpc_keepalive: true
-# Cassandra provides three out-of-the-box options for the RPC Server:
+# Cassandra provides three options for the RPC Server:
#
# sync -> One connection per thread in the rpc pool (see below).
# For a very large number of clients, memory will be your limiting
@@ -305,9 +305,6 @@ rpc_keepalive: true
#
# The default is sync because on Windows hsha is about 30% slower. On Linux,
# sync/hsha performance is about the same, with hsha of course using less memory.
-#
-# Alternatively, can provide your own RPC server by providing the fully-qualified class name
-# of an o.a.c.t.TServerFactory that can create an instance of it.
rpc_server_type: sync
# Uncomment rpc_min|max|thread to set request pool size.
View
2 src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -381,6 +381,8 @@ else if (conf.memtable_flush_writers == null)
if (conf.stream_throughput_outbound_megabits_per_sec == null)
conf.stream_throughput_outbound_megabits_per_sec = 400;
+ if (!CassandraDaemon.rpc_server_types.contains(conf.rpc_server_type.toLowerCase()))
+ throw new ConfigurationException("Unknown rpc_server_type: " + conf.rpc_server_type);
if (conf.rpc_min_threads == null)
conf.rpc_min_threads = conf.rpc_server_type.toLowerCase().equals("hsha")
? Runtime.getRuntime().availableProcessors() * 4
View
120 src/java/org/apache/cassandra/thrift/CassandraDaemon.java
@@ -20,14 +20,28 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.cassandra.service.AbstractCassandraDaemon;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.TThreadPoolServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.service.AbstractCassandraDaemon;
+import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
/**
* This class supports two methods for creating a Cassandra node daemon,
@@ -48,9 +62,10 @@
}
private static Logger logger = LoggerFactory.getLogger(CassandraDaemon.class);
- final static String SYNC = "sync";
- final static String ASYNC = "async";
- final static String HSHA = "hsha";
+ private final static String SYNC = "sync";
+ private final static String ASYNC = "async";
+ private final static String HSHA = "hsha";
+ public final static List<String> rpc_server_types = Arrays.asList(SYNC, ASYNC, HSHA);
private ThriftServer server;
protected void startServer()
@@ -102,21 +117,94 @@ public static void main(String[] args)
public ThriftServer(InetAddress listenAddr, int listenPort)
{
// now we start listening for clients
+ final CassandraServer cassandraServer = new CassandraServer();
+ Cassandra.Processor processor = new Cassandra.Processor(cassandraServer);
+
+ // Transport
logger.info(String.format("Binding thrift service to %s:%s", listenAddr, listenPort));
- TServerFactory.Args args = new TServerFactory.Args();
- args.tProtocolFactory = new TBinaryProtocol.Factory(true, true, DatabaseDescriptor.getThriftMaxMessageLength());
- args.addr = new InetSocketAddress(listenAddr, listenPort);
- args.cassandraServer = new CassandraServer();
- args.processor = new Cassandra.Processor(args.cassandraServer);
- args.keepAlive = DatabaseDescriptor.getRpcKeepAlive();
- args.sendBufferSize = DatabaseDescriptor.getRpcSendBufferSize();
- args.recvBufferSize = DatabaseDescriptor.getRpcRecvBufferSize();
+ // Protocol factory
+ TProtocolFactory tProtocolFactory = new TBinaryProtocol.Factory(true, true, DatabaseDescriptor.getThriftMaxMessageLength());
+
+ // Transport factory
int tFramedTransportSize = DatabaseDescriptor.getThriftFramedTransportSize();
- logger.info("Using TFramedTransport with a max frame size of {} bytes.", tFramedTransportSize);
- args.inTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
- args.outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
- serverEngine = new TServerCustomFactory(DatabaseDescriptor.getRpcServerType()).buildTServer(args);
+ TTransportFactory inTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
+ TTransportFactory outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
+ logger.info("Using TFastFramedTransport with a max frame size of {} bytes.", tFramedTransportSize);
+
+ if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(SYNC))
+ {
+ TServerTransport serverTransport;
+ try
+ {
+ serverTransport = new TCustomServerSocket(new InetSocketAddress(listenAddr, listenPort),
+ DatabaseDescriptor.getRpcKeepAlive(),
+ DatabaseDescriptor.getRpcSendBufferSize(),
+ DatabaseDescriptor.getRpcRecvBufferSize());
+ }
+ catch (TTransportException e)
+ {
+ throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", listenAddr, listenPort), e);
+ }
+ // ThreadPool Server and will be invocation per connection basis...
+ TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport)
+ .minWorkerThreads(DatabaseDescriptor.getRpcMinThreads())
+ .maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads())
+ .inputTransportFactory(inTransportFactory)
+ .outputTransportFactory(outTransportFactory)
+ .inputProtocolFactory(tProtocolFactory)
+ .outputProtocolFactory(tProtocolFactory)
+ .processor(processor);
+ ExecutorService executorService = new CleaningThreadPool(cassandraServer.clientState, serverArgs.minWorkerThreads, serverArgs.maxWorkerThreads);
+ serverEngine = new CustomTThreadPoolServer(serverArgs, executorService);
+ logger.info(String.format("Using synchronous/threadpool thrift server on %s : %s", listenAddr, listenPort));
+ }
+ else
+ {
+ TNonblockingServerTransport serverTransport;
+ try
+ {
+ serverTransport = new TCustomNonblockingServerSocket(new InetSocketAddress(listenAddr, listenPort),
+ DatabaseDescriptor.getRpcKeepAlive(),
+ DatabaseDescriptor.getRpcSendBufferSize(),
+ DatabaseDescriptor.getRpcRecvBufferSize());
+ }
+ catch (TTransportException e)
+ {
+ throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", listenAddr, listenPort), e);
+ }
+
+ if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(ASYNC))
+ {
+ // This is single threaded hence the invocation will be all
+ // in one thread.
+ TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
+ .outputTransportFactory(outTransportFactory)
+ .inputProtocolFactory(tProtocolFactory)
+ .outputProtocolFactory(tProtocolFactory)
+ .processor(processor);
+ logger.info(String.format("Using non-blocking/asynchronous thrift server on %s : %s", listenAddr, listenPort));
+ serverEngine = new CustomTNonBlockingServer(serverArgs);
+ }
+ else if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(HSHA))
+ {
+ // This is NIO selector service but the invocation will be Multi-Threaded with the Executor service.
+ ExecutorService executorService = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getRpcMinThreads(),
+ DatabaseDescriptor.getRpcMaxThreads(),
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(),
+ new NamedThreadFactory("RPC-Thread"), "RPC-THREAD-POOL");
+ TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
+ .outputTransportFactory(outTransportFactory)
+ .inputProtocolFactory(tProtocolFactory)
+ .outputProtocolFactory(tProtocolFactory)
+ .processor(processor);
+ logger.info(String.format("Using custom half-sync/half-async thrift server on %s : %s", listenAddr, listenPort));
+ // Check for available processors in the system which will be equal to the IO Threads.
+ serverEngine = new CustomTHsHaServer(serverArgs, executorService, Runtime.getRuntime().availableProcessors());
+ }
+ }
}
public void run()
View
39 src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
@@ -22,7 +22,6 @@
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
@@ -31,15 +30,9 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.service.SocketSessionManagementService;
import org.apache.thrift.server.TNonblockingServer;
-import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TNonblockingTransport;
@@ -350,36 +343,4 @@ protected void requestSelectInterestChange(FrameBuffer fb)
// thread because the method is not synchronized with the rest of the
// selectors threads.
}
-
- public static class Factory implements TServerFactory
- {
- public TServer buildTServer(Args args)
- {
- final InetSocketAddress addr = args.addr;
- TNonblockingServerTransport serverTransport;
- try
- {
- serverTransport = new TCustomNonblockingServerSocket(addr, args.keepAlive, args.sendBufferSize, args.recvBufferSize);
- }
- catch (TTransportException e)
- {
- throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", addr.getAddress(), addr.getPort()), e);
- }
-
- // This is NIO selector service but the invocation will be Multi-Threaded with the Executor service.
- ExecutorService executorService = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getRpcMinThreads(),
- DatabaseDescriptor.getRpcMaxThreads(),
- 60L,
- TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>(),
- new NamedThreadFactory("RPC-Thread"), "RPC-THREAD-POOL");
- TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(args.inTransportFactory)
- .outputTransportFactory(args.outTransportFactory)
- .inputProtocolFactory(args.tProtocolFactory)
- .outputProtocolFactory(args.tProtocolFactory)
- .processor(args.processor);
- // Check for available processors in the system which will be equal to the IO Threads.
- return new CustomTHsHaServer(serverArgs, executorService, Runtime.getRuntime().availableProcessors());
- }
- }
}
View
31 src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
@@ -21,14 +21,9 @@
*/
-import java.net.InetSocketAddress;
-
import org.apache.cassandra.service.SocketSessionManagementService;
import org.apache.thrift.server.TNonblockingServer;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TNonblockingSocket;
-import org.apache.thrift.transport.TTransportException;
public class CustomTNonBlockingServer extends TNonblockingServer
{
@@ -45,30 +40,4 @@ protected boolean requestInvoke(FrameBuffer frameBuffer)
frameBuffer.invoke();
return true;
}
-
- public static class Factory implements TServerFactory
- {
- public TServer buildTServer(Args args)
- {
- final InetSocketAddress addr = args.addr;
- TNonblockingServerTransport serverTransport;
- try
- {
- serverTransport = new TCustomNonblockingServerSocket(addr, args.keepAlive, args.sendBufferSize, args.recvBufferSize);
- }
- catch (TTransportException e)
- {
- throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", addr.getAddress(), addr.getPort()), e);
- }
-
- // This is single threaded hence the invocation will be all
- // in one thread.
- TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(args.inTransportFactory)
- .outputTransportFactory(args.outTransportFactory)
- .inputProtocolFactory(args.tProtocolFactory)
- .outputProtocolFactory(args.tProtocolFactory)
- .processor(args.processor);
- return new CustomTNonBlockingServer(serverArgs);
- }
- }
}
View
32 src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
@@ -19,22 +19,18 @@
package org.apache.cassandra.thrift;
-import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.service.AbstractCassandraDaemon;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
@@ -223,32 +219,4 @@ public void run()
}
}
}
-
- public static class Factory implements TServerFactory
- {
- public TServer buildTServer(Args args)
- {
- final InetSocketAddress addr = args.addr;
- TServerTransport serverTransport;
- try
- {
- serverTransport = new TCustomServerSocket(addr, args.keepAlive, args.sendBufferSize, args.recvBufferSize);
- }
- catch (TTransportException e)
- {
- throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", addr.getAddress(), addr.getPort()), e);
- }
- // ThreadPool Server and will be invocation per connection basis...
- TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport)
- .minWorkerThreads(DatabaseDescriptor.getRpcMinThreads())
- .maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads())
- .inputTransportFactory(args.inTransportFactory)
- .outputTransportFactory(args.outTransportFactory)
- .inputProtocolFactory(args.tProtocolFactory)
- .outputProtocolFactory(args.tProtocolFactory)
- .processor(args.processor);
- ExecutorService executorService = new AbstractCassandraDaemon.CleaningThreadPool(args.cassandraServer.clientState, serverArgs.minWorkerThreads, serverArgs.maxWorkerThreads);
- return new CustomTThreadPoolServer(serverArgs, executorService);
- }
- }
}
View
75 src/java/org/apache/cassandra/thrift/TServerCustomFactory.java
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.thrift;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.thrift.server.TServer;
-
-/**
- * Helper implementation to create a thrift TServer based on one of the common types we support (sync, async, hsha),
- * or a custom type by setting the fully qualified java class name in the rpc_server_type setting.
- */
-public class TServerCustomFactory implements TServerFactory
-{
- private static Logger logger = LoggerFactory.getLogger(TServerCustomFactory.class);
- private final String serverType;
-
- public TServerCustomFactory(String serverType)
- {
- assert serverType != null;
- this.serverType = serverType;
- }
-
- public TServer buildTServer(TServerFactory.Args args)
- {
- TServer server;
- if (CassandraDaemon.SYNC.equalsIgnoreCase(serverType))
- {
- server = new CustomTThreadPoolServer.Factory().buildTServer(args);
- logger.info(String.format("Using synchronous/threadpool thrift server on %s : %s", args.addr.getHostName(), args.addr.getPort()));
- }
- else if(CassandraDaemon.ASYNC.equalsIgnoreCase(serverType))
- {
- server = new CustomTNonBlockingServer.Factory().buildTServer(args);
- logger.info(String.format("Using non-blocking/asynchronous thrift server on %s : %s", args.addr.getHostName(), args.addr.getPort()));
- }
- else if(CassandraDaemon.HSHA.equalsIgnoreCase(serverType))
- {
- server = new CustomTHsHaServer.Factory().buildTServer(args);
- logger.info(String.format("Using custom half-sync/half-async thrift server on %s : %s", args.addr.getHostName(), args.addr.getPort()));
- }
- else
- {
- TServerFactory serverFactory;
- try
- {
- serverFactory = (TServerFactory) Class.forName(serverType).newInstance();
- }
- catch (Exception e)
- {
- throw new RuntimeException("Failed to instantiate server factory:" + serverType, e);
- }
- server = serverFactory.buildTServer(args);
- logger.info(String.format("Using custom thrift server %s on %s : %s", server.getClass().getName(), args.addr.getHostName(), args.addr.getPort()));
- }
- return server;
- }
-}
View
43 src/java/org/apache/cassandra/thrift/TServerFactory.java
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.thrift;
-
-import java.net.InetSocketAddress;
-
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.transport.TTransportFactory;
-
-public interface TServerFactory
-{
- TServer buildTServer(Args args);
-
- public static class Args
- {
- public InetSocketAddress addr;
- public CassandraServer cassandraServer;
- public Cassandra.Processor processor;
- public TProtocolFactory tProtocolFactory;
- public TTransportFactory inTransportFactory;
- public TTransportFactory outTransportFactory;
- public Integer sendBufferSize;
- public Integer recvBufferSize;
- public boolean keepAlive;
- }
-}

0 comments on commit 5b5b20a

Please sign in to comment.
Something went wrong with that request. Please try again.