Skip to content

Commit

Permalink
Merge branch 'cassandra-1.1' into trunk
Browse files Browse the repository at this point in the history
Conflicts:
	src/java/org/apache/cassandra/tools/BulkLoader.java
  • Loading branch information
yukim committed Sep 28, 2012
2 parents ccd1a3d + 732d82b commit 7a9ca84
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Expand Up @@ -97,6 +97,7 @@
* (CQL3) Fix validation for IN queries for non-PK cols (CASSANDRA-4709)
* fix re-created keyspace disappering after 1.1.5 upgrade (CASSANDRA-4698)
* (CLI) display elapsed time in 2 fraction digits (CASSANDRA-3460)
* add authentication support to sstableloader (CASSANDRA-4712)
Merged from 1.0:
* Switch from NBHM to CHM in MessagingService's callback map, which
prevents OOM in long-running instances (CASSANDRA-4708)
Expand Down
37 changes: 31 additions & 6 deletions src/java/org/apache/cassandra/tools/BulkLoader.java
Expand Up @@ -23,6 +23,7 @@
import java.net.UnknownHostException;
import java.util.*;

import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
Expand All @@ -36,7 +37,6 @@
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

public class BulkLoader
{
Expand All @@ -48,6 +48,8 @@ public class BulkLoader
private static final String IGNORE_NODES_OPTION = "ignore";
private static final String INITIAL_HOST_ADDRESS_OPTION = "nodes";
private static final String RPC_PORT_OPTION = "port";
private static final String USER_OPTION = "username";
private static final String PASSWD_OPTION = "password";
private static final String THROTTLE_MBITS = "throttle";

public static void main(String args[]) throws IOException
Expand All @@ -56,7 +58,7 @@ public static void main(String args[]) throws IOException
try
{
OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(handler, options.hosts, options.rpcPort), handler);
SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(handler, options.hosts, options.rpcPort, options.user, options.passwd), handler);
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
SSTableLoader.LoaderFuture future = loader.stream(options.ignores);

Expand Down Expand Up @@ -178,13 +180,17 @@ static class ExternalClient extends SSTableLoader.Client
private final OutputHandler outputHandler;
private final Set<InetAddress> hosts;
private final int rpcPort;
private final String user;
private final String passwd;

public ExternalClient(OutputHandler outputHandler, Set<InetAddress> hosts, int port)
public ExternalClient(OutputHandler outputHandler, Set<InetAddress> hosts, int port, String user, String passwd)
{
super();
this.outputHandler = outputHandler;
this.hosts = hosts;
this.rpcPort = port;
this.user = user;
this.passwd = passwd;
}

public void init(String keyspace)
Expand All @@ -197,7 +203,7 @@ public void init(String keyspace)

// Query endpoint to ranges map and schemas from thrift
InetAddress host = hostiter.next();
Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort);
Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort, this.user, this.passwd);
List<TokenRange> tokenRanges = client.describe_ring(keyspace);
List<KsDef> ksDefs = client.describe_keyspaces();

Expand Down Expand Up @@ -236,13 +242,22 @@ public boolean validateColumnFamily(String keyspace, String cfName)
return cfs != null && cfs.contains(cfName);
}

private static Cassandra.Client createThriftClient(String host, int port) throws TTransportException
private static Cassandra.Client createThriftClient(String host, int port, String user, String passwd) throws Exception
{
TSocket socket = new TSocket(host, port);
TTransport trans = new TFramedTransport(socket);
trans.open();
TProtocol protocol = new TBinaryProtocol(trans);
return new Cassandra.Client(protocol);
Cassandra.Client client = new Cassandra.Client(protocol);
if (user != null && passwd != null)
{
Map<String, String> credentials = new HashMap<String, String>();
credentials.put(IAuthenticator.USERNAME_KEY, user);
credentials.put(IAuthenticator.PASSWORD_KEY, passwd);
AuthenticationRequest authenticationRequest = new AuthenticationRequest(credentials);
client.login(authenticationRequest);
}
return client;
}
}

Expand All @@ -254,6 +269,8 @@ static class LoaderOptions
public boolean verbose;
public boolean noProgress;
public int rpcPort = 9160;
public String user;
public String passwd;
public int throttle = 0;

public final Set<InetAddress> hosts = new HashSet<InetAddress>();
Expand Down Expand Up @@ -314,6 +331,12 @@ public static LoaderOptions parseArgs(String cmdArgs[])
if (cmd.hasOption(RPC_PORT_OPTION))
opts.rpcPort = Integer.parseInt(cmd.getOptionValue(RPC_PORT_OPTION));

if (cmd.hasOption(USER_OPTION))
opts.user = cmd.getOptionValue(USER_OPTION);

if (cmd.hasOption(PASSWD_OPTION))
opts.passwd = cmd.getOptionValue(PASSWD_OPTION);

if (cmd.hasOption(INITIAL_HOST_ADDRESS_OPTION))
{
String[] nodes = cmd.getOptionValue(INITIAL_HOST_ADDRESS_OPTION).split(",");
Expand Down Expand Up @@ -379,6 +402,8 @@ private static CmdLineOptions getCmdLineOptions()
options.addOption("d", INITIAL_HOST_ADDRESS_OPTION, "initial hosts", "try to connect to these hosts (comma separated) initially for ring information");
options.addOption("p", RPC_PORT_OPTION, "rpc port", "port used for rpc (default 9160)");
options.addOption("t", THROTTLE_MBITS, "throttle", "throttle speed in Mbits (default unlimited)");
options.addOption("u", USER_OPTION, "username", "username for cassandra authentication");
options.addOption("pw", PASSWD_OPTION, "password", "password for cassandra authentication");
return options;
}

Expand Down

0 comments on commit 7a9ca84

Please sign in to comment.