Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'cassandra-1.2' into trunk

Conflicts:
	tools/stress/src/org/apache/cassandra/stress/Session.java
  • Loading branch information...
commit 1aa798f0c186c87c094797aacf35577ab11169c4 2 parents 2575275 + 06699d4
@pcmanus pcmanus authored
View
1  CHANGES.txt
@@ -30,6 +30,7 @@
* cli: Warn about missing CQL3 tables in schema descriptions (CASSANDRA-5309)
* Re-enable unknown option in replication/compaction strategies option for
backward compatibility (CASSANDRA-4795)
+ * Add binary protocol support to stress (CASSANDRA-4993)
Merged from 1.1:
* nodetool: ability to repair specific range (CASSANDRA-5280)
* Fix possible assertion triggered in SliceFromReadCommand (CASSANDRA-5284)
View
2  bin/cassandra
@@ -129,7 +129,7 @@ launch_service()
if [ "x$pidpath" != "x" ]; then
cassandra_parms="$cassandra_parms -Dcassandra-pidfile=$pidpath"
fi
-
+
# The cassandra-foreground option will tell CassandraDaemon not
# to close stdout/stderr, but it's up to us not to background.
if [ "x$foreground" != "x" ]; then
View
5 src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -93,6 +93,11 @@ public SimpleClient(String host, int port, ClientEncryptionOptions encryptionOpt
this.encryptionOptions = encryptionOptions;
}
+ public SimpleClient(String host, int port)
+ {
+ this(host, port, new ClientEncryptionOptions());
+ }
+
public void connect(boolean useCompression) throws IOException
{
establishConnection();
View
26 tools/stress/src/org/apache/cassandra/stress/Session.java
@@ -38,6 +38,7 @@
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.stress.util.CassandraClient;
+import org.apache.cassandra.transport.SimpleClient;
import org.apache.cassandra.thrift.*;
import org.apache.commons.lang.StringUtils;
@@ -95,6 +96,7 @@
availableOptions.addOption("l", "replication-factor", true, "Replication Factor to use when creating needed column families, default:1");
availableOptions.addOption("L", "enable-cql", false, "Perform queries using CQL2 (Cassandra Query Language v 2.0.0)");
availableOptions.addOption("L3", "enable-cql3", false, "Perform queries using CQL3 (Cassandra Query Language v 3.0.0)");
+ availableOptions.addOption("b", "enable-native-protocol", false, "Use the binary native protocol (only work along with -L3)");
availableOptions.addOption("P", "use-prepared-statements", false, "Perform queries using prepared statements (only applicable to CQL).");
availableOptions.addOption("e", "consistency-level", true, "Consistency Level to use (ONE, QUORUM, LOCAL_QUORUM, EACH_QUORUM, ALL, ANY), default:ONE");
availableOptions.addOption("x", "create-index", true, "Type of index to create on needed column families (KEYS)");
@@ -140,6 +142,7 @@
private boolean use_prepared = false;
private boolean trace = false;
private boolean captureStatistics = true;
+ public boolean use_native_protocol = false;
private final String outFileName;
@@ -302,6 +305,12 @@ else if (replicationStrategy.endsWith("SimpleStrategy"))
cqlVersion = "3.0.0";
}
+ if (cmd.hasOption("b"))
+ {
+ if (!(enable_cql && cqlVersion.startsWith("3")))
+ throw new IllegalArgumentException("Cannot use binary protocol without -L3");
+ use_native_protocol = true;
+ }
if (cmd.hasOption("P"))
{
@@ -703,6 +712,7 @@ public CassandraClient getClient()
{
return getClient(true);
}
+
/**
* Thrift client connection
* @param setKeyspace - should we set keyspace for client or not
@@ -742,6 +752,22 @@ public CassandraClient getClient(boolean setKeyspace)
return client;
}
+ public SimpleClient getNativeClient()
+ {
+ try
+ {
+ String currentNode = nodes[Stress.randomizer.nextInt(nodes.length)];
+ SimpleClient client = new SimpleClient(currentNode, 9042);
+ client.connect(false);
+ client.execute("USE \"Keyspace1\";", org.apache.cassandra.db.ConsistencyLevel.ONE);
+ return client;
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
public static InetAddress getLocalAddress()
{
if (localInetAddress == null)
View
62 tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -25,6 +25,7 @@
import org.apache.cassandra.stress.operations.*;
import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
+import org.apache.cassandra.transport.SimpleClient;
public class StressAction extends Thread
{
@@ -231,29 +232,60 @@ public Consumer(int toConsume)
public void run()
{
- CassandraClient connection = client.getClient();
-
- for (int i = 0; i < items; i++)
+ if (client.use_native_protocol)
{
- if (stop)
- break;
+ SimpleClient connection = client.getNativeClient();
- try
+ for (int i = 0; i < items; i++)
{
- operations.take().run(connection); // running job
+ if (stop)
+ break;
+
+ try
+ {
+ operations.take().run(connection); // running job
+ }
+ catch (Exception e)
+ {
+ if (output == null)
+ {
+ System.err.println(e.getMessage());
+ returnCode = StressAction.FAILURE;
+ System.exit(-1);
+ }
+
+ output.println(e.getMessage());
+ returnCode = StressAction.FAILURE;
+ break;
+ }
}
- catch (Exception e)
+ }
+ else
+ {
+ CassandraClient connection = client.getClient();
+
+ for (int i = 0; i < items; i++)
{
- if (output == null)
+ if (stop)
+ break;
+
+ try
{
- System.err.println(e.getMessage());
- returnCode = StressAction.FAILURE;
- System.exit(-1);
+ operations.take().run(connection); // running job
}
+ catch (Exception e)
+ {
+ if (output == null)
+ {
+ System.err.println(e.getMessage());
+ returnCode = StressAction.FAILURE;
+ System.exit(-1);
+ }
- output.println(e.getMessage());
- returnCode = StressAction.FAILURE;
- break;
+ output.println(e.getMessage());
+ returnCode = StressAction.FAILURE;
+ break;
+ }
}
}
}
View
96 tools/stress/src/org/apache/cassandra/stress/operations/CQLOperation.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.stress.operations;
+
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
+import org.apache.cassandra.stress.util.Operation;
+import org.apache.cassandra.transport.SimpleClient;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.thrift.Compression;
+import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.thrift.ThriftConversion;
+
+public abstract class CQLOperation extends Operation
+{
+ public CQLOperation(Session client, int idx)
+ {
+ super(client, idx);
+ }
+
+ protected abstract void run(CQLQueryExecutor executor) throws IOException;
+
+ protected abstract boolean validateThriftResult(CqlResult result);
+
+ protected abstract boolean validateNativeResult(ResultMessage result);
+
+ public void run(final CassandraClient client) throws IOException
+ {
+ run(new CQLQueryExecutor()
+ {
+ public boolean execute(String cqlQuery, List<String> queryParams) throws Exception
+ {
+ CqlResult result = null;
+ if (session.usePreparedStatements())
+ {
+ Integer stmntId = getPreparedStatement(client, cqlQuery);
+ if (session.cqlVersion.startsWith("3"))
+ result = client.execute_prepared_cql3_query(stmntId, queryParamsAsByteBuffer(queryParams), session.getConsistencyLevel());
+ else
+ result = client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParams));
+ }
+ else
+ {
+ String formattedQuery = formatCqlQuery(cqlQuery, queryParams);
+ if (session.cqlVersion.startsWith("3"))
+ result = client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE, session.getConsistencyLevel());
+ else
+ result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE);
+ }
+ return validateThriftResult(result);
+ }
+ });
+ }
+
+ public void run(final SimpleClient client) throws IOException
+ {
+ run(new CQLQueryExecutor()
+ {
+ public boolean execute(String cqlQuery, List<String> queryParams) throws Exception
+ {
+ ResultMessage result = null;
+ if (session.usePreparedStatements())
+ {
+ byte[] stmntId = getPreparedStatement(client, cqlQuery);
+ result = client.executePrepared(stmntId, queryParamsAsByteBuffer(queryParams), ThriftConversion.fromThrift(session.getConsistencyLevel()));
+ }
+ else
+ {
+ String formattedQuery = formatCqlQuery(cqlQuery, queryParams);
+ result = client.execute(formattedQuery, ThriftConversion.fromThrift(session.getConsistencyLevel()));
+ }
+ return validateNativeResult(result);
+ }
+ });
+ }
+}
View
39 tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
@@ -24,16 +24,19 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
+import java.util.List;
import com.yammer.metrics.core.TimerContext;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
+import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.thrift.Compression;
+import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.utils.ByteBufferUtil;
-public class CqlCounterAdder extends Operation
+public class CqlCounterAdder extends CQLOperation
{
private static String cqlQuery = null;
@@ -42,7 +45,7 @@ public CqlCounterAdder(Session client, int idx)
super(client, idx);
}
- public void run(CassandraClient client) throws IOException
+ protected void run(CQLQueryExecutor executor) throws IOException
{
if (session.getColumnFamilyType() == ColumnFamilyType.Super)
throw new RuntimeException("Super columns are not implemented for CQL");
@@ -70,7 +73,7 @@ public void run(CassandraClient client) throws IOException
}
String key = String.format("%0" + session.getTotalKeysLength() + "d", index);
- String formattedQuery = null;
+ List<String> queryParams = Collections.singletonList(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3")));
TimerContext context = session.latency.time();
@@ -84,25 +87,7 @@ public void run(CassandraClient client) throws IOException
try
{
- if (session.usePreparedStatements())
- {
- Integer stmntId = getPreparedStatement(client, cqlQuery);
- if (session.cqlVersion.startsWith("3"))
- client.execute_prepared_cql3_query(stmntId, Collections.singletonList(ByteBuffer.wrap(key.getBytes())), session.getConsistencyLevel());
- else
- client.execute_prepared_cql_query(stmntId, Collections.singletonList(ByteBuffer.wrap(key.getBytes())));
- }
- else
- {
- if (formattedQuery == null)
- formattedQuery = formatCqlQuery(cqlQuery, Collections.singletonList(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3"))));
- if (session.cqlVersion.startsWith("3"))
- client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE, session.getConsistencyLevel());
- else
- client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE);
- }
-
- success = true;
+ success = executor.execute(cqlQuery, queryParams);
}
catch (Exception e)
{
@@ -124,4 +109,14 @@ public void run(CassandraClient client) throws IOException
session.keys.getAndIncrement();
context.stop();
}
+
+ protected boolean validateThriftResult(CqlResult result)
+ {
+ return true;
+ }
+
+ protected boolean validateNativeResult(ResultMessage result)
+ {
+ return true;
+ }
}
View
43 tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
@@ -24,18 +24,20 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
+import java.util.List;
import com.yammer.metrics.core.TimerContext;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
+import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.thrift.CqlResultType;
import org.apache.cassandra.utils.ByteBufferUtil;
-public class CqlCounterGetter extends Operation
+public class CqlCounterGetter extends CQLOperation
{
private static String cqlQuery = null;
@@ -44,7 +46,7 @@ public CqlCounterGetter(Session client, int idx)
super(client, idx);
}
- public void run(CassandraClient client) throws IOException
+ protected void run(CQLQueryExecutor executor) throws IOException
{
if (session.getColumnFamilyType() == ColumnFamilyType.Super)
throw new RuntimeException("Super columns are not implemented for CQL");
@@ -69,7 +71,7 @@ public void run(CassandraClient client) throws IOException
}
byte[] key = generateKey();
- String formattedQuery = null;
+ List<String> queryParams = Collections.singletonList(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3")));
TimerContext context = session.latency.time();
@@ -83,30 +85,7 @@ public void run(CassandraClient client) throws IOException
try
{
- CqlResult result = null;
-
- if (session.usePreparedStatements())
- {
- Integer stmntId = getPreparedStatement(client, cqlQuery);
- if (session.cqlVersion.startsWith("3"))
- result = client.execute_prepared_cql3_query(stmntId, Collections.singletonList(ByteBuffer.wrap(key)), session.getConsistencyLevel());
- else
- result = client.execute_prepared_cql_query(stmntId, Collections.singletonList(ByteBuffer.wrap(key)));
- }
- else
- {
- if (formattedQuery == null)
- formattedQuery = formatCqlQuery(cqlQuery, Collections.singletonList(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3"))));
-
- if (session.cqlVersion.startsWith("3"))
- result = client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE, session.getConsistencyLevel());
- else
- result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE);
- }
-
- assert result.type.equals(CqlResultType.ROWS) : "expected ROWS result type";
- assert result.rows.size() == 0 : "expected exactly one row";
- success = (result.rows.get(0).columns.size() != 0);
+ success = executor.execute(cqlQuery, queryParams);
}
catch (Exception e)
{
@@ -128,4 +107,14 @@ public void run(CassandraClient client) throws IOException
session.keys.getAndIncrement();
context.stop();
}
+
+ protected boolean validateThriftResult(CqlResult result)
+ {
+ return result.rows.get(0).columns.size() != 0;
+ }
+
+ protected boolean validateNativeResult(ResultMessage result)
+ {
+ return result instanceof ResultMessage.Rows && ((ResultMessage.Rows)result).result.size() != 0;
+ }
}
View
75 tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
@@ -27,26 +27,31 @@
import java.util.List;
import com.yammer.metrics.core.TimerContext;
+import org.apache.cassandra.cql3.ResultSet;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
+import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.thrift.CqlRow;
import org.apache.cassandra.utils.ByteBufferUtil;
-public class CqlIndexedRangeSlicer extends Operation
+public class CqlIndexedRangeSlicer extends CQLOperation
{
private static List<ByteBuffer> values = null;
private static String cqlQuery = null;
+ private int lastQueryResultSize;
+ private int lastMaxKey;
+
public CqlIndexedRangeSlicer(Session client, int idx)
{
super(client, idx);
}
- public void run(CassandraClient client) throws IOException
+ protected void run(CQLQueryExecutor executor) throws IOException
{
if (session.getColumnFamilyType() == ColumnFamilyType.Super)
throw new RuntimeException("Super columns are not implemented for CQL");
@@ -56,8 +61,14 @@ public void run(CassandraClient client) throws IOException
if (cqlQuery == null)
{
- StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey())
- .append(" ''..'' FROM Standard1");
+ StringBuilder query = new StringBuilder("SELECT ");
+
+ if (session.cqlVersion.startsWith("2"))
+ query.append(session.getColumnsPerKey()).append(" ''..''");
+ else
+ query.append("*");
+
+ query.append(" FROM Standard1");
if (session.cqlVersion.startsWith("2"))
query.append(" USING CONSISTENCY ").append(session.getConsistencyLevel());
@@ -79,7 +90,6 @@ public void run(CassandraClient client) throws IOException
boolean success = false;
String exceptionMessage = null;
- CqlResult results = null;
String formattedQuery = null;
List<String> queryParms = Collections.singletonList(getUnQuotedCqlBlob(startOffset, session.cqlVersion.startsWith("3")));
@@ -90,25 +100,7 @@ public void run(CassandraClient client) throws IOException
try
{
- if (session.usePreparedStatements())
- {
- Integer stmntId = getPreparedStatement(client, cqlQuery);
- if (session.cqlVersion.startsWith("3"))
- results = client.execute_prepared_cql3_query(stmntId, queryParamsAsByteBuffer(queryParms), session.getConsistencyLevel());
- else
- results = client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParms));
- }
- else
- {
- if (formattedQuery == null)
- formattedQuery = formatCqlQuery(cqlQuery, queryParms);
- if (session.cqlVersion.startsWith("3"))
- results = client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE, session.getConsistencyLevel());
- else
- results = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE);
- }
-
- success = (results.rows.size() != 0);
+ success = executor.execute(cqlQuery, queryParms);
}
catch (Exception e)
{
@@ -126,13 +118,13 @@ public void run(CassandraClient client) throws IOException
(exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
}
- received += results.rows.size();
+ received += lastQueryResultSize;
// convert max key found back to an integer, and increment it
- startOffset = String.format(format, (1 + getMaxKey(results.rows)));
+ startOffset = String.format(format, (1 + lastMaxKey));
session.operations.getAndIncrement();
- session.keys.getAndAdd(results.rows.size());
+ session.keys.getAndAdd(lastQueryResultSize);
context.stop();
}
}
@@ -155,4 +147,33 @@ private int getMaxKey(List<CqlRow> rows)
return maxKey;
}
+
+ private int getMaxKey(ResultSet rs)
+ {
+ int maxKey = ByteBufferUtil.toInt(rs.rows.get(0).get(0));
+
+ for (List<ByteBuffer> row : rs.rows)
+ {
+ int currentKey = ByteBufferUtil.toInt(row.get(0));
+ if (currentKey > maxKey)
+ maxKey = currentKey;
+ }
+
+ return maxKey;
+ }
+
+ protected boolean validateThriftResult(CqlResult result)
+ {
+ lastQueryResultSize = result.rows.size();
+ lastMaxKey = getMaxKey(result.rows);
+ return lastQueryResultSize != 0;
+ }
+
+ protected boolean validateNativeResult(ResultMessage result)
+ {
+ assert result instanceof ResultMessage.Rows;
+ lastQueryResultSize = ((ResultMessage.Rows)result).result.size();
+ lastMaxKey = getMaxKey(((ResultMessage.Rows)result).result);
+ return lastQueryResultSize != 0;
+ }
}
View
39 tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
@@ -31,10 +31,13 @@
import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
+import org.apache.cassandra.transport.SimpleClient;
+import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.thrift.Compression;
+import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.utils.UUIDGen;
-public class CqlInserter extends Operation
+public class CqlInserter extends CQLOperation
{
private static List<ByteBuffer> values;
private static String cqlQuery = null;
@@ -44,7 +47,7 @@ public CqlInserter(Session client, int idx)
super(client, idx);
}
- public void run(CassandraClient client) throws IOException
+ protected void run(CQLQueryExecutor executor) throws IOException
{
if (session.getColumnFamilyType() == ColumnFamilyType.Super)
throw new RuntimeException("Super columns are not implemented for CQL");
@@ -95,8 +98,6 @@ public void run(CassandraClient client) throws IOException
String key = String.format("%0" + session.getTotalKeysLength() + "d", index);
queryParms.add(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3")));
- String formattedQuery = null;
-
TimerContext context = session.latency.time();
boolean success = false;
@@ -109,25 +110,7 @@ public void run(CassandraClient client) throws IOException
try
{
- if (session.usePreparedStatements())
- {
- Integer stmntId = getPreparedStatement(client, cqlQuery);
- if (session.cqlVersion.startsWith("3"))
- client.execute_prepared_cql3_query(stmntId, queryParamsAsByteBuffer(queryParms), session.getConsistencyLevel());
- else
- client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParms));
- }
- else
- {
- if (formattedQuery == null)
- formattedQuery = formatCqlQuery(cqlQuery, queryParms);
- if (session.cqlVersion.startsWith("3"))
- client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE, session.getConsistencyLevel());
- else
- client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE);
- }
-
- success = true;
+ success = executor.execute(cqlQuery, queryParms);
}
catch (Exception e)
{
@@ -150,4 +133,14 @@ public void run(CassandraClient client) throws IOException
session.keys.getAndIncrement();
context.stop();
}
+
+ protected boolean validateThriftResult(CqlResult result)
+ {
+ return true;
+ }
+
+ protected boolean validateNativeResult(ResultMessage result)
+ {
+ return true;
+ }
}
View
6 tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
@@ -26,6 +26,7 @@
import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
+import org.apache.cassandra.transport.SimpleClient;
public class CqlMultiGetter extends Operation
{
@@ -38,4 +39,9 @@ public void run(CassandraClient client) throws IOException
{
throw new RuntimeException("Multiget is not implemented for CQL");
}
+
+ public void run(SimpleClient client) throws IOException
+ {
+ throw new RuntimeException("Multiget is not implemented for CQL");
+ }
}
View
49 tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
@@ -24,6 +24,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
+import java.util.List;
import com.yammer.metrics.core.TimerContext;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -32,19 +33,22 @@
import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
+import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.transport.SimpleClient;
-public class CqlRangeSlicer extends Operation
+public class CqlRangeSlicer extends CQLOperation
{
private static String cqlQuery = null;
+ private int lastRowCount;
public CqlRangeSlicer(Session client, int idx)
{
super(client, idx);
}
- public void run(CassandraClient client) throws IOException
+ protected void run(CQLQueryExecutor executor) throws IOException
{
if (session.getColumnFamilyType() == ColumnFamilyType.Super)
throw new RuntimeException("Super columns are not implemented for CQL");
@@ -61,13 +65,12 @@ public void run(CassandraClient client) throws IOException
}
String key = String.format("%0" + session.getTotalKeysLength() + "d", index);
- String formattedQuery = null;
+ List<String> queryParams = Collections.singletonList(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3")));
TimerContext context = session.latency.time();
boolean success = false;
String exceptionMessage = null;
- int rowCount = 0;
for (int t = 0; t < session.getRetryTimes(); t++)
{
@@ -76,28 +79,7 @@ public void run(CassandraClient client) throws IOException
try
{
- CqlResult result = null;
-
- if (session.usePreparedStatements())
- {
- Integer stmntId = getPreparedStatement(client, cqlQuery);
- if (session.cqlVersion.startsWith("3"))
- result = client.execute_prepared_cql3_query(stmntId, Collections.singletonList(ByteBuffer.wrap(key.getBytes())), session.getConsistencyLevel());
- else
- result = client.execute_prepared_cql_query(stmntId, Collections.singletonList(ByteBuffer.wrap(key.getBytes())));
- }
- else
- {
- if (formattedQuery == null)
- formattedQuery = formatCqlQuery(cqlQuery, Collections.singletonList(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3"))));
- if (session.cqlVersion.startsWith("3"))
- result = client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE, session.getConsistencyLevel());
- else
- result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE);
- }
-
- rowCount = result.rows.size();
- success = (rowCount != 0);
+ success = executor.execute(cqlQuery, queryParams);
}
catch (Exception e)
{
@@ -117,7 +99,20 @@ public void run(CassandraClient client) throws IOException
}
session.operations.getAndIncrement();
- session.keys.getAndAdd(rowCount);
+ session.keys.getAndAdd(lastRowCount);
context.stop();
}
+
+ protected boolean validateThriftResult(CqlResult result)
+ {
+ lastRowCount = result.rows.size();
+ return lastRowCount != 0;
+ }
+
+ protected boolean validateNativeResult(ResultMessage result)
+ {
+ assert result instanceof ResultMessage.Rows;
+ lastRowCount = ((ResultMessage.Rows)result).result.size();
+ return lastRowCount != 0;
+ }
}
View
42 tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
@@ -31,10 +31,13 @@
import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
+import org.apache.cassandra.transport.SimpleClient;
+import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.thrift.ThriftConversion;
-public class CqlReader extends Operation
+public class CqlReader extends CQLOperation
{
private static String cqlQuery = null;
@@ -43,7 +46,7 @@ public CqlReader(Session client, int idx)
super(client, idx);
}
- public void run(CassandraClient client) throws IOException
+ protected void run(CQLQueryExecutor executor) throws IOException
{
if (session.getColumnFamilyType() == ColumnFamilyType.Super)
throw new RuntimeException("Super columns are not implemented for CQL");
@@ -85,8 +88,6 @@ public void run(CassandraClient client) throws IOException
byte[] key = generateKey();
queryParams.add(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3")));
- String formattedQuery = null;
-
TimerContext context = session.latency.time();
boolean success = false;
@@ -99,31 +100,10 @@ public void run(CassandraClient client) throws IOException
try
{
- CqlResult result = null;
-
- if (session.usePreparedStatements())
- {
- Integer stmntId = getPreparedStatement(client, cqlQuery);
- if (session.cqlVersion.startsWith("3"))
- result = client.execute_prepared_cql3_query(stmntId, queryParamsAsByteBuffer(queryParams), session.getConsistencyLevel());
- else
- result = client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParams));
- }
- else
- {
- if (formattedQuery == null)
- formattedQuery = formatCqlQuery(cqlQuery, queryParams);
- if (session.cqlVersion.startsWith("3"))
- result = client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE, session.getConsistencyLevel());
- else
- result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE);
- }
-
- success = (result.rows.get(0).columns.size() != 0);
+ success = executor.execute(cqlQuery, queryParams);
}
catch (Exception e)
{
-
exceptionMessage = getExceptionMessage(e);
success = false;
}
@@ -143,4 +123,14 @@ public void run(CassandraClient client) throws IOException
session.keys.getAndIncrement();
context.stop();
}
+
+ protected boolean validateThriftResult(CqlResult result)
+ {
+ return result.rows.get(0).columns.size() != 0;
+ }
+
+ protected boolean validateNativeResult(ResultMessage result)
+ {
+ return result instanceof ResultMessage.Rows && ((ResultMessage.Rows)result).result.size() != 0;
+ }
}
View
29 tools/stress/src/org/apache/cassandra/stress/util/Operation.java
@@ -26,6 +26,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.Map;
+import java.util.HashMap;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
@@ -33,6 +35,7 @@
import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.Stress;
+import org.apache.cassandra.transport.SimpleClient;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.CqlPreparedResult;
import org.apache.cassandra.thrift.InvalidRequestException;
@@ -67,6 +70,8 @@ public Operation(Session client, int idx)
*/
public abstract void run(CassandraClient client) throws IOException;
+ public void run(SimpleClient client) throws IOException {}
+
// Utility methods
protected List<ByteBuffer> generateValues()
@@ -287,12 +292,14 @@ protected static String formatCqlQuery(String query, List<String> parms)
return result.toString();
}
- protected static Integer getPreparedStatement(CassandraClient client, String cqlQuery) throws Exception
+ protected Integer getPreparedStatement(CassandraClient client, String cqlQuery) throws Exception
{
Integer statementId = client.preparedStatements.get(cqlQuery.hashCode());
if (statementId == null)
{
- CqlPreparedResult response = client.prepare_cql_query(ByteBufferUtil.bytes(cqlQuery), Compression.NONE);
+ CqlPreparedResult response = session.cqlVersion.startsWith("3")
+ ? client.prepare_cql3_query(ByteBufferUtil.bytes(cqlQuery), Compression.NONE)
+ : client.prepare_cql_query(ByteBufferUtil.bytes(cqlQuery), Compression.NONE);
statementId = response.itemId;
client.preparedStatements.put(cqlQuery.hashCode(), statementId);
}
@@ -300,10 +307,28 @@ protected static Integer getPreparedStatement(CassandraClient client, String cql
return statementId;
}
+ private static final Map<Integer, byte[]> preparedStatementsNative = new HashMap<Integer, byte[]>();
+
+ protected static byte[] getPreparedStatement(SimpleClient client, String cqlQuery) throws Exception
+ {
+ byte[] statementId = preparedStatementsNative.get(cqlQuery.hashCode());
+ if (statementId == null)
+ {
+ statementId = client.prepare(cqlQuery).statementId.bytes;
+ preparedStatementsNative.put(cqlQuery.hashCode(), statementId);
+ }
+ return statementId;
+ }
+
protected String wrapInQuotesIfRequired(String string)
{
return session.cqlVersion.startsWith("3")
? "\"" + string + "\""
: string;
}
+
+ public interface CQLQueryExecutor
+ {
+ public boolean execute(String query, List<String> queryParameters) throws Exception;
+ }
}
Please sign in to comment.
Something went wrong with that request. Please try again.