Browse files

rename get_cf -> readColumnFamily; ReadMessage -> ReadCommand.

[Message message = ReadMessage.readMessage(readMessage) is just plain confusing]
patch by jbellis; reviewed by Eric Evans for #88
  • Loading branch information...
1 parent 550a5ba commit be7a79d123cba06636189ab3ec284d6d6d68b29f Jonathan Ellis committed Apr 20, 2009
View
41 src/org/apache/cassandra/db/ReadMessage.java → src/org/apache/cassandra/db/ReadCommand.java
@@ -28,7 +28,6 @@
import org.apache.commons.lang.StringUtils;
-import org.apache.cassandra.continuations.Suspendable;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.StorageService;
@@ -38,26 +37,26 @@
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
*/
-public class ReadMessage implements Serializable
+public class ReadCommand implements Serializable
{
- private static ICompactSerializer<ReadMessage> serializer_;
+ private static ICompactSerializer<ReadCommand> serializer_;
public static final String doRepair_ = "READ-REPAIR";
-
+
static
{
- serializer_ = new ReadMessageSerializer();
+ serializer_ = new ReadCommandSerializer();
}
- static ICompactSerializer<ReadMessage> serializer()
+ static ICompactSerializer<ReadCommand> serializer()
{
return serializer_;
}
- public static Message makeReadMessage(ReadMessage readMessage) throws IOException
+ public static Message makeReadMessage(ReadCommand readCommand) throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
- ReadMessage.serializer().serialize(readMessage, dos);
+ ReadCommand.serializer().serialize(readCommand, dos);
Message message = new Message(StorageService.getLocalStorageEndPoint(), StorageService.readStage_, StorageService.readVerbHandler_, new Object[]{bos.toByteArray()});
return message;
}
@@ -71,32 +70,32 @@ public static Message makeReadMessage(ReadMessage readMessage) throws IOExceptio
private List<String> columns_ = new ArrayList<String>();
private boolean isDigestQuery_ = false;
- private ReadMessage()
+ private ReadCommand()
{
}
- public ReadMessage(String table, String key)
+ public ReadCommand(String table, String key)
{
table_ = table;
key_ = key;
}
- public ReadMessage(String table, String key, String columnFamily_column)
+ public ReadCommand(String table, String key, String columnFamily_column)
{
table_ = table;
key_ = key;
columnFamily_column_ = columnFamily_column;
}
- public ReadMessage(String table, String key, String columnFamily, List<String> columns)
+ public ReadCommand(String table, String key, String columnFamily, List<String> columns)
{
table_ = table;
key_ = key;
columnFamily_column_ = columnFamily;
columns_ = columns;
}
- public ReadMessage(String table, String key, String columnFamily_column, int start, int count)
+ public ReadCommand(String table, String key, String columnFamily_column, int start, int count)
{
table_ = table;
key_ = key;
@@ -105,7 +104,7 @@ public ReadMessage(String table, String key, String columnFamily_column, int sta
count_ = count;
}
- public ReadMessage(String table, String key, String columnFamily_column, long sinceTimestamp)
+ public ReadCommand(String table, String key, String columnFamily_column, long sinceTimestamp)
{
table_ = table;
key_ = key;
@@ -173,9 +172,9 @@ public String toString()
}
}
-class ReadMessageSerializer implements ICompactSerializer<ReadMessage>
+class ReadCommandSerializer implements ICompactSerializer<ReadCommand>
{
- public void serialize(ReadMessage rm, DataOutputStream dos) throws IOException
+ public void serialize(ReadCommand rm, DataOutputStream dos) throws IOException
{
dos.writeUTF(rm.table());
dos.writeUTF(rm.key());
@@ -196,7 +195,7 @@ public void serialize(ReadMessage rm, DataOutputStream dos) throws IOException
}
}
- public ReadMessage deserialize(DataInputStream dis) throws IOException
+ public ReadCommand deserialize(DataInputStream dis) throws IOException
{
String table = dis.readUTF();
String key = dis.readUTF();
@@ -214,18 +213,18 @@ public ReadMessage deserialize(DataInputStream dis) throws IOException
dis.readFully(bytes);
columns.add( new String(bytes) );
}
- ReadMessage rm = null;
+ ReadCommand rm = null;
if ( columns.size() > 0 )
{
- rm = new ReadMessage(table, key, columnFamily_column, columns);
+ rm = new ReadCommand(table, key, columnFamily_column, columns);
}
else if( sinceTimestamp > 0 )
{
- rm = new ReadMessage(table, key, columnFamily_column, sinceTimestamp);
+ rm = new ReadCommand(table, key, columnFamily_column, sinceTimestamp);
}
else
{
- rm = new ReadMessage(table, key, columnFamily_column, start, count);
+ rm = new ReadCommand(table, key, columnFamily_column, start, count);
}
rm.setIsDigestQuery(isDigest);
return rm;
View
49 src/org/apache/cassandra/db/ReadVerbHandler.java
@@ -19,12 +19,9 @@
package org.apache.cassandra.db;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
import java.util.List;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.continuations.Suspendable;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.net.EndPoint;
@@ -34,8 +31,6 @@
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.utils.*;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -77,38 +72,38 @@ public void doVerb(Message message)
try
{
- ReadMessage readMessage = ReadMessage.serializer().deserialize(readCtx.bufIn_);
- Table table = Table.open(readMessage.table());
+ ReadCommand readCommand = ReadCommand.serializer().deserialize(readCtx.bufIn_);
+ Table table = Table.open(readCommand.table());
Row row = null;
long start = System.currentTimeMillis();
- if( readMessage.columnFamily_column() == null )
- row = table.get(readMessage.key());
+ if( readCommand.columnFamily_column() == null )
+ row = table.get(readCommand.key());
else
{
- if(readMessage.getColumnNames().size() == 0)
+ if(readCommand.getColumnNames().size() == 0)
{
- if(readMessage.count() > 0 && readMessage.start() >= 0)
- row = table.getRow(readMessage.key(), readMessage.columnFamily_column(), readMessage.start(), readMessage.count());
+ if(readCommand.count() > 0 && readCommand.start() >= 0)
+ row = table.getRow(readCommand.key(), readCommand.columnFamily_column(), readCommand.start(), readCommand.count());
else
- row = table.getRow(readMessage.key(), readMessage.columnFamily_column());
+ row = table.getRow(readCommand.key(), readCommand.columnFamily_column());
}
else
{
- row = table.getRow(readMessage.key(), readMessage.columnFamily_column(), readMessage.getColumnNames());
+ row = table.getRow(readCommand.key(), readCommand.columnFamily_column(), readCommand.getColumnNames());
}
}
logger_.info("getRow() TIME: " + (System.currentTimeMillis() - start) + " ms.");
start = System.currentTimeMillis();
ReadResponseMessage readResponseMessage = null;
- if(readMessage.isDigestQuery())
+ if(readCommand.isDigestQuery())
{
readResponseMessage = new ReadResponseMessage(table.getTableName(), row.digest());
}
else
{
readResponseMessage = new ReadResponseMessage(table.getTableName(), row);
}
- readResponseMessage.setIsDigestQuery(readMessage.isDigestQuery());
+ readResponseMessage.setIsDigestQuery(readCommand.isDigestQuery());
/* serialize the ReadResponseMessage. */
readCtx.bufOut_.reset();
@@ -126,9 +121,9 @@ public void doVerb(Message message)
logger_.info("ReadVerbHandler TIME 2: " + (System.currentTimeMillis() - start) + " ms.");
/* Do read repair if header of the message says so */
- String repair = new String( message.getHeader(ReadMessage.doRepair_) );
- if ( repair.equals( ReadMessage.doRepair_ ) )
- doReadRepair(row, readMessage);
+ String repair = new String( message.getHeader(ReadCommand.doRepair_) );
+ if ( repair.equals( ReadCommand.doRepair_ ) )
+ doReadRepair(row, readCommand);
}
catch ( IOException ex)
{
@@ -140,29 +135,29 @@ public void doVerb(Message message)
}
}
- private void doReadRepair(Row row, ReadMessage readMessage)
+ private void doReadRepair(Row row, ReadCommand readCommand)
{
if ( DatabaseDescriptor.getConsistencyCheck() )
{
- List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(readMessage.key());
+ List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(readCommand.key());
/* Remove the local storage endpoint from the list. */
endpoints.remove( StorageService.getLocalStorageEndPoint() );
- if(readMessage.getColumnNames().size() == 0)
+ if(readCommand.getColumnNames().size() == 0)
{
- if( readMessage.start() >= 0 && readMessage.count() < Integer.MAX_VALUE)
+ if( readCommand.start() >= 0 && readCommand.count() < Integer.MAX_VALUE)
{
- StorageService.instance().doConsistencyCheck(row, endpoints, readMessage.columnFamily_column(), readMessage.start(), readMessage.count());
+ StorageService.instance().doConsistencyCheck(row, endpoints, readCommand.columnFamily_column(), readCommand.start(), readCommand.count());
}
- if( readMessage.sinceTimestamp() > 0)
+ if( readCommand.sinceTimestamp() > 0)
{
- StorageService.instance().doConsistencyCheck(row, endpoints, readMessage.columnFamily_column(), readMessage.sinceTimestamp());
+ StorageService.instance().doConsistencyCheck(row, endpoints, readCommand.columnFamily_column(), readCommand.sinceTimestamp());
}
}
else
{
- StorageService.instance().doConsistencyCheck(row, endpoints, readMessage.columnFamily_column(), readMessage.getColumnNames());
+ StorageService.instance().doConsistencyCheck(row, endpoints, readCommand.columnFamily_column(), readCommand.getColumnNames());
}
}
}
View
7 src/org/apache/cassandra/service/CassandraServer.java
@@ -37,7 +37,6 @@
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.Column;
import org.apache.cassandra.utils.LogUtil;
import org.apache.thrift.TException;
@@ -81,7 +80,7 @@ private void validateTable(String table) throws CassandraException
}
}
- protected ColumnFamily get_cf(String tablename, String key, String columnFamily, List<String> columNames) throws CassandraException, TException
+ protected ColumnFamily readColumnFamily(String tablename, String key, String columnFamily, List<String> columNames) throws CassandraException, TException
{
ColumnFamily cfamily = null;
try
@@ -205,7 +204,7 @@ protected ColumnFamily get_cf(String tablename, String key, String columnFamily,
try
{
validateTable(tablename);
- ColumnFamily cfamily = get_cf(tablename, key, columnFamily, columnNames);
+ ColumnFamily cfamily = readColumnFamily(tablename, key, columnFamily, columnNames);
if (cfamily == null)
{
logger_.info("ERROR ColumnFamily " + columnFamily + " is missing.....: "
@@ -486,7 +485,7 @@ public boolean remove(String tablename, String key, String columnFamily_column,
try
{
validateTable(tablename);
- ColumnFamily cfamily = get_cf(tablename, key, columnFamily, superColumnNames);
+ ColumnFamily cfamily = readColumnFamily(tablename, key, columnFamily, superColumnNames);
if (cfamily == null)
{
logger_.info("ERROR ColumnFamily " + columnFamily + " is missing.....: "+" key:" + key
View
30 src/org/apache/cassandra/service/ConsistencyManager.java
@@ -22,13 +22,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.ThreadFactoryImpl;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponseMessage;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.io.DataInputBuffer;
@@ -96,9 +92,9 @@ private void doReadRepair() throws IOException
replicas_.add(StorageService.getLocalStorageEndPoint());
IAsyncCallback responseHandler = new DataRepairHandler(ConsistencyManager.this.replicas_.size(), readResponseResolver);
String table = DatabaseDescriptor.getTables().get(0);
- ReadMessage readMessage = constructReadMessage(false);
+ ReadCommand readCommand = constructReadMessage(false);
// ReadMessage readMessage = new ReadMessage(table, row_.key(), columnFamily_);
- Message message = ReadMessage.makeReadMessage(readMessage);
+ Message message = ReadCommand.makeReadMessage(readCommand);
MessagingService.getMessagingInstance().sendRR(message, replicas_.toArray( new EndPoint[0] ), responseHandler);
}
}
@@ -187,10 +183,10 @@ private void handleResponses()
public void run()
{
logger_.debug(" Run the consistency checks for " + columnFamily_);
- ReadMessage readMessageDigestOnly = constructReadMessage(true);
+ ReadCommand readCommandDigestOnly = constructReadMessage(true);
try
{
- Message messageDigestOnly = ReadMessage.makeReadMessage(readMessageDigestOnly);
+ Message messageDigestOnly = ReadCommand.makeReadMessage(readCommandDigestOnly);
IAsyncCallback digestResponseHandler = new DigestResponseHandler();
MessagingService.getMessagingInstance().sendRR(messageDigestOnly, replicas_.toArray(new EndPoint[0]), digestResponseHandler);
}
@@ -200,32 +196,32 @@ public void run()
}
}
- private ReadMessage constructReadMessage(boolean isDigestQuery)
+ private ReadCommand constructReadMessage(boolean isDigestQuery)
{
- ReadMessage readMessage = null;
+ ReadCommand readCommand = null;
String table = DatabaseDescriptor.getTables().get(0);
if(columnNames_.size() == 0)
{
if( start_ >= 0 && count_ < Integer.MAX_VALUE)
{
- readMessage = new ReadMessage(table, row_.key(), columnFamily_, start_, count_);
+ readCommand = new ReadCommand(table, row_.key(), columnFamily_, start_, count_);
}
else if(sinceTimestamp_ > 0)
{
- readMessage = new ReadMessage(table, row_.key(), columnFamily_, sinceTimestamp_);
+ readCommand = new ReadCommand(table, row_.key(), columnFamily_, sinceTimestamp_);
}
else
{
- readMessage = new ReadMessage(table, row_.key(), columnFamily_);
+ readCommand = new ReadCommand(table, row_.key(), columnFamily_);
}
}
else
{
- readMessage = new ReadMessage(table, row_.key(), columnFamily_, columnNames_);
+ readCommand = new ReadCommand(table, row_.key(), columnFamily_, columnNames_);
}
- readMessage.setIsDigestQuery(isDigestQuery);
- return readMessage;
+ readCommand.setIsDigestQuery(isDigestQuery);
+ return readCommand;
}
}
View
14 src/org/apache/cassandra/service/MultiQuorumResponseHandler.java
@@ -28,7 +28,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.IAsyncCallback;
@@ -47,7 +47,7 @@
private Lock lock_ = new ReentrantLock();
private Condition condition_;
/* This maps the keys to the original data read messages */
- private Map<String, ReadMessage> readMessages_ = new HashMap<String, ReadMessage>();
+ private Map<String, ReadCommand> readMessages_ = new HashMap<String, ReadCommand>();
/* This maps the key to its set of replicas */
private Map<String, EndPoint[]> endpoints_ = new HashMap<String, EndPoint[]>();
/* This maps the groupId to the individual callback for the set of messages */
@@ -129,18 +129,18 @@ private void onDigestMismatch(String key) throws IOException
{
if ( DatabaseDescriptor.getConsistencyCheck())
{
- ReadMessage readMessage = readMessages_.get(key);
- readMessage.setIsDigestQuery(false);
- Message messageRepair = ReadMessage.makeReadMessage(readMessage);
- EndPoint[] endpoints = MultiQuorumResponseHandler.this.endpoints_.get( readMessage.key() );
+ ReadCommand readCommand = readMessages_.get(key);
+ readCommand.setIsDigestQuery(false);
+ Message messageRepair = ReadCommand.makeReadMessage(readCommand);
+ EndPoint[] endpoints = MultiQuorumResponseHandler.this.endpoints_.get( readCommand.key() );
Message[][] messages = new Message[][]{ {messageRepair, messageRepair, messageRepair} };
EndPoint[][] epList = new EndPoint[][]{ endpoints };
MessagingService.getMessagingInstance().sendRR(messages, epList, MultiQuorumResponseHandler.this);
}
}
}
- public MultiQuorumResponseHandler(Map<String, ReadMessage> readMessages, Map<String, EndPoint[]> endpoints)
+ public MultiQuorumResponseHandler(Map<String, ReadCommand> readMessages, Map<String, EndPoint[]> endpoints)
{
condition_ = lock_.newCondition();
readMessages_ = readMessages;
View
130 src/org/apache/cassandra/service/StorageProxy.java
@@ -30,16 +30,14 @@
import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponseMessage;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.RowMutationMessage;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.TouchMessage;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
@@ -145,13 +143,13 @@ public static boolean insertBlocking(RowMutation rm)
}
}
- private static Map<String, Message> constructMessages(Map<String, ReadMessage> readMessages) throws IOException
+ private static Map<String, Message> constructMessages(Map<String, ReadCommand> readMessages) throws IOException
{
Map<String, Message> messages = new HashMap<String, Message>();
Set<String> keys = readMessages.keySet();
for ( String key : keys )
{
- Message message = ReadMessage.makeReadMessage( readMessages.get(key) );
+ Message message = ReadCommand.makeReadMessage( readMessages.get(key) );
messages.put(key, message);
}
return messages;
@@ -182,7 +180,7 @@ private static IAsyncResult dispatchMessages(Map<String, EndPoint> endPoints, Ma
* @throws IOException
* @throws TimeoutException
*/
- public static Map<String, Row> doReadProtocol(Map<String, ReadMessage> readMessages) throws IOException,TimeoutException
+ public static Map<String, Row> doReadProtocol(Map<String, ReadCommand> readMessages) throws IOException,TimeoutException
{
Map<String, Row> rows = new HashMap<String, Row>();
Set<String> keys = readMessages.keySet();
@@ -206,14 +204,14 @@ private static IAsyncResult dispatchMessages(Map<String, EndPoint> endPoints, Ma
return rows;
}
- public static Row doReadProtocol(String key, ReadMessage readMessage) throws IOException,TimeoutException
+ public static Row doReadProtocol(String key, ReadCommand readCommand) throws IOException,TimeoutException
{
Row row = null;
EndPoint endPoint = StorageService.instance().findSuitableEndPoint(key);
if(endPoint != null)
{
- Message message = ReadMessage.makeReadMessage(readMessage);
- message.addHeader(ReadMessage.doRepair_, ReadMessage.doRepair_.getBytes());
+ Message message = ReadCommand.makeReadMessage(readCommand);
+ message.addHeader(ReadCommand.doRepair_, ReadCommand.doRepair_.getBytes());
IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
Object[] result = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
byte[] body = (byte[])result[0];
@@ -310,9 +308,9 @@ public static Row readProtocol(String tablename, String key, String columnFamily
}
if(!foundLocal && consistencyLevel == StorageService.ConsistencyLevel.WEAK)
{
- ReadMessage readMessage = null;
- readMessage = new ReadMessage(tablename, key, columnFamily, columnNames);
- return doReadProtocol(key, readMessage);
+ ReadCommand readCommand = null;
+ readCommand = new ReadCommand(tablename, key, columnFamily, columnNames);
+ return doReadProtocol(key, readCommand);
}
else
{
@@ -349,9 +347,9 @@ public static Row readProtocol(String tablename, String key, String columnFamily
}
if(!foundLocal && consistencyLevel == StorageService.ConsistencyLevel.WEAK)
{
- ReadMessage readMessage = null;
- readMessage = new ReadMessage(tablename, key, columnFamily, start, count);
- return doReadProtocol(key, readMessage);
+ ReadCommand readCommand = null;
+ readCommand = new ReadCommand(tablename, key, columnFamily, start, count);
+ return doReadProtocol(key, readCommand);
}
else
{
@@ -408,9 +406,9 @@ public static Row readProtocol(String tablename, String key, String columnFamily
}
if(!foundLocal && consistencyLevel == StorageService.ConsistencyLevel.WEAK)
{
- ReadMessage readMessage = null;
- readMessage = new ReadMessage(tablename, key, columnFamily, sinceTimestamp);
- return doReadProtocol(key, readMessage);
+ ReadCommand readCommand = null;
+ readCommand = new ReadCommand(tablename, key, columnFamily, sinceTimestamp);
+ return doReadProtocol(key, readCommand);
}
else
{
@@ -436,12 +434,12 @@ public static Row strongReadProtocol(String tablename, String key, String column
{
long startTime = System.currentTimeMillis();
// TODO: throw a thrift exception if we do not have N nodes
- ReadMessage readMessage = new ReadMessage(tablename, key, columnFamily, columns);
+ ReadCommand readCommand = new ReadCommand(tablename, key, columnFamily, columns);
- ReadMessage readMessageDigestOnly = new ReadMessage(tablename, key, columnFamily, columns);
- readMessageDigestOnly.setIsDigestQuery(true);
+ ReadCommand readCommandDigestOnly = new ReadCommand(tablename, key, columnFamily, columns);
+ readCommandDigestOnly.setIsDigestQuery(true);
- Row row = StorageProxy.doStrongReadProtocol(key, readMessage, readMessageDigestOnly);
+ Row row = StorageProxy.doStrongReadProtocol(key, readCommand, readCommandDigestOnly);
logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
return row;
}
@@ -466,27 +464,27 @@ public static Row strongReadProtocol(String tablename, String key, String column
{
long startTime = System.currentTimeMillis();
// TODO: throw a thrift exception if we do not have N nodes
- ReadMessage readMessage = null;
- ReadMessage readMessageDigestOnly = null;
+ ReadCommand readCommand = null;
+ ReadCommand readCommandDigestOnly = null;
if( start >= 0 && count < Integer.MAX_VALUE)
{
- readMessage = new ReadMessage(tablename, key, columnFamily, start, count);
+ readCommand = new ReadCommand(tablename, key, columnFamily, start, count);
}
else
{
- readMessage = new ReadMessage(tablename, key, columnFamily);
+ readCommand = new ReadCommand(tablename, key, columnFamily);
}
- Message message = ReadMessage.makeReadMessage(readMessage);
+ Message message = ReadCommand.makeReadMessage(readCommand);
if( start >= 0 && count < Integer.MAX_VALUE)
{
- readMessageDigestOnly = new ReadMessage(tablename, key, columnFamily, start, count);
+ readCommandDigestOnly = new ReadCommand(tablename, key, columnFamily, start, count);
}
else
{
- readMessageDigestOnly = new ReadMessage(tablename, key, columnFamily);
+ readCommandDigestOnly = new ReadCommand(tablename, key, columnFamily);
}
- readMessageDigestOnly.setIsDigestQuery(true);
- Row row = doStrongReadProtocol(key, readMessage, readMessageDigestOnly);
+ readCommandDigestOnly.setIsDigestQuery(true);
+ Row row = doStrongReadProtocol(key, readCommand, readCommandDigestOnly);
logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
return row;
}
@@ -507,27 +505,27 @@ public static Row strongReadProtocol(String tablename, String key, String column
Map<String, Row> rows = new HashMap<String, Row>();
long startTime = System.currentTimeMillis();
// TODO: throw a thrift exception if we do not have N nodes
- Map<String, ReadMessage[]> readMessages = new HashMap<String, ReadMessage[]>();
+ Map<String, ReadCommand[]> readMessages = new HashMap<String, ReadCommand[]>();
for (String key : keys )
{
- ReadMessage[] readMessage = new ReadMessage[2];
+ ReadCommand[] readCommand = new ReadCommand[2];
if( start >= 0 && count < Integer.MAX_VALUE)
{
- readMessage[0] = new ReadMessage(tablename, key, columnFamily, start, count);
+ readCommand[0] = new ReadCommand(tablename, key, columnFamily, start, count);
}
else
{
- readMessage[0] = new ReadMessage(tablename, key, columnFamily);
+ readCommand[0] = new ReadCommand(tablename, key, columnFamily);
}
if( start >= 0 && count < Integer.MAX_VALUE)
{
- readMessage[1] = new ReadMessage(tablename, key, columnFamily, start, count);
+ readCommand[1] = new ReadCommand(tablename, key, columnFamily, start, count);
}
else
{
- readMessage[1] = new ReadMessage(tablename, key, columnFamily);
+ readCommand[1] = new ReadCommand(tablename, key, columnFamily);
}
- readMessage[1].setIsDigestQuery(true);
+ readCommand[1].setIsDigestQuery(true);
}
rows = doStrongReadProtocol(readMessages);
logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
@@ -538,13 +536,13 @@ public static Row strongReadProtocol(String tablename, String key, String column
{
long startTime = System.currentTimeMillis();
// TODO: throw a thrift exception if we do not have N nodes
- ReadMessage readMessage = null;
- ReadMessage readMessageDigestOnly = null;
- readMessage = new ReadMessage(tablename, key, columnFamily, sinceTimestamp);
- Message message = ReadMessage.makeReadMessage(readMessage);
- readMessageDigestOnly = new ReadMessage(tablename, key, columnFamily, sinceTimestamp);
- readMessageDigestOnly.setIsDigestQuery(true);
- Row row = doStrongReadProtocol(key, readMessage, readMessageDigestOnly);
+ ReadCommand readCommand = null;
+ ReadCommand readCommandDigestOnly = null;
+ readCommand = new ReadCommand(tablename, key, columnFamily, sinceTimestamp);
+ Message message = ReadCommand.makeReadMessage(readCommand);
+ readCommandDigestOnly = new ReadCommand(tablename, key, columnFamily, sinceTimestamp);
+ readCommandDigestOnly.setIsDigestQuery(true);
+ Row row = doStrongReadProtocol(key, readCommand, readCommandDigestOnly);
logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
return row;
}
@@ -555,11 +553,11 @@ public static Row strongReadProtocol(String tablename, String key, String column
* param @ readMessage - the read message to get the actual data
* param @ readMessageDigest - the read message to get the digest.
*/
- private static Row doStrongReadProtocol(String key, ReadMessage readMessage, ReadMessage readMessageDigest) throws IOException, TimeoutException
+ private static Row doStrongReadProtocol(String key, ReadCommand readCommand, ReadCommand readCommandDigest) throws IOException, TimeoutException
{
Row row = null;
- Message message = ReadMessage.makeReadMessage(readMessage);
- Message messageDigestOnly = ReadMessage.makeReadMessage(readMessageDigest);
+ Message message = ReadCommand.makeReadMessage(readCommand);
+ Message messageDigestOnly = ReadCommand.makeReadMessage(readCommandDigest);
IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(
@@ -607,9 +605,9 @@ private static Row doStrongReadProtocol(String key, ReadMessage readMessage, Rea
QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
DatabaseDescriptor.getReplicationFactor(),
readResponseResolverRepair);
- readMessage.setIsDigestQuery(false);
+ readCommand.setIsDigestQuery(false);
logger_.info("DigestMismatchException: " + key);
- Message messageRepair = ReadMessage.makeReadMessage(readMessage);
+ Message messageRepair = ReadCommand.makeReadMessage(readCommand);
MessagingService.getMessagingInstance().sendRR(messageRepair, endPoints, quorumResponseHandlerRepair);
try
{
@@ -628,29 +626,29 @@ private static Row doStrongReadProtocol(String key, ReadMessage readMessage, Rea
return row;
}
- private static Map<String, Message[]> constructReplicaMessages(Map<String, ReadMessage[]> readMessages) throws IOException
+ private static Map<String, Message[]> constructReplicaMessages(Map<String, ReadCommand[]> readMessages) throws IOException
{
Map<String, Message[]> messages = new HashMap<String, Message[]>();
Set<String> keys = readMessages.keySet();
for ( String key : keys )
{
Message[] msg = new Message[DatabaseDescriptor.getReplicationFactor()];
- ReadMessage[] readMessage = readMessages.get(key);
- msg[0] = ReadMessage.makeReadMessage( readMessage[0] );
+ ReadCommand[] readCommand = readMessages.get(key);
+ msg[0] = ReadCommand.makeReadMessage( readCommand[0] );
for ( int i = 1; i < msg.length; ++i )
{
- msg[i] = ReadMessage.makeReadMessage( readMessage[1] );
+ msg[i] = ReadCommand.makeReadMessage( readCommand[1] );
}
}
return messages;
}
- private static MultiQuorumResponseHandler dispatchMessages(Map<String, ReadMessage[]> readMessages, Map<String, Message[]> messages) throws IOException
+ private static MultiQuorumResponseHandler dispatchMessages(Map<String, ReadCommand[]> readMessages, Map<String, Message[]> messages) throws IOException
{
Set<String> keys = messages.keySet();
/* This maps the keys to the original data read messages */
- Map<String, ReadMessage> readMessage = new HashMap<String, ReadMessage>();
+ Map<String, ReadCommand> readMessage = new HashMap<String, ReadCommand>();
/* This maps the keys to their respective endpoints/replicas */
Map<String, EndPoint[]> endpoints = new HashMap<String, EndPoint[]>();
/* Groups the messages that need to be sent to the individual keys */
@@ -700,7 +698,7 @@ private static MultiQuorumResponseHandler dispatchMessages(Map<String, ReadMessa
* @return map containing key ---> Row
* @throws IOException, TimeoutException
*/
- private static Map<String, Row> doStrongReadProtocol(Map<String, ReadMessage[]> readMessages) throws IOException
+ private static Map<String, Row> doStrongReadProtocol(Map<String, ReadCommand[]> readMessages) throws IOException
{
Map<String, Row> rows = new HashMap<String, Row>();
/* Construct the messages to be sent to the replicas */
@@ -769,11 +767,11 @@ public static Row weakReadProtocol(String tablename, String key, String columnFa
{
Row row = null;
long startTime = System.currentTimeMillis();
- Map<String, ReadMessage> readMessages = new HashMap<String, ReadMessage>();
+ Map<String, ReadCommand> readMessages = new HashMap<String, ReadCommand>();
for ( String key : keys )
{
- ReadMessage readMessage = new ReadMessage(tablename, key, columnFamily, columns);
- readMessages.put(key, readMessage);
+ ReadCommand readCommand = new ReadCommand(tablename, key, columnFamily, columns);
+ readMessages.put(key, readCommand);
}
/* Performs the multiget in parallel */
Map<String, Row> rows = doReadProtocol(readMessages);
@@ -850,11 +848,11 @@ public static Row weakReadProtocol(String tablename, String key, String columnFa
{
Row row = null;
long startTime = System.currentTimeMillis();
- Map<String, ReadMessage> readMessages = new HashMap<String, ReadMessage>();
+ Map<String, ReadCommand> readMessages = new HashMap<String, ReadCommand>();
for ( String key : keys )
{
- ReadMessage readMessage = new ReadMessage(tablename, key, columnFamily, start, count);
- readMessages.put(key, readMessage);
+ ReadCommand readCommand = new ReadCommand(tablename, key, columnFamily, start, count);
+ readMessages.put(key, readCommand);
}
/* Performs the multiget in parallel */
Map<String, Row> rows = doReadProtocol(readMessages);
@@ -919,11 +917,11 @@ public static Row weakReadProtocol(String tablename, String key, String columnFa
{
Row row = null;
long startTime = System.currentTimeMillis();
- Map<String, ReadMessage> readMessages = new HashMap<String, ReadMessage>();
+ Map<String, ReadCommand> readMessages = new HashMap<String, ReadCommand>();
for ( String key : keys )
{
- ReadMessage readMessage = new ReadMessage(tablename, key, columnFamily, sinceTimestamp);
- readMessages.put(key, readMessage);
+ ReadCommand readCommand = new ReadCommand(tablename, key, columnFamily, sinceTimestamp);
+ readMessages.put(key, readCommand);
}
/* Performs the multiget in parallel */
Map<String, Row> rows = doReadProtocol(readMessages);
View
6 src/org/apache/cassandra/test/DataImporter.java
@@ -43,7 +43,7 @@
import org.apache.cassandra.concurrent.ThreadFactoryImpl;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponseMessage;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowMutation;
@@ -880,10 +880,10 @@ public void testRead(String filepath) throws Throwable {
key = user + ":1";
}
- ReadMessage readMessage = new ReadMessage(tablename_, key);
+ ReadCommand readCommand = new ReadCommand(tablename_, key);
Message message = new Message(from_, StorageService.readStage_,
StorageService.readVerbHandler_,
- new Object[] { readMessage });
+ new Object[] {readCommand});
IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(
message, to_);
Object[] result = iar.get();
View
17 src/org/apache/cassandra/test/StressTest.java
@@ -19,28 +19,21 @@
package org.apache.cassandra.test;
import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import org.apache.cassandra.analytics.AnalyticsContext;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.ThreadFactoryImpl;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Memtable;
-import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.RowMutationMessage;
import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.Cassandra;
@@ -147,15 +140,15 @@ public void applyLoad(RowMutation rm) throws IOException {
}
- public void readLoad(ReadMessage readMessage)
+ public void readLoad(ReadCommand readCommand)
{
IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(
1,
readResponseResolver);
Message message = new Message(from_, StorageService.readStage_,
StorageService.readVerbHandler_,
- new Object[] { readMessage });
+ new Object[] {readCommand});
MessagingService.getMessagingInstance().sendOneWay(message, to_);
/*IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, to_);
try
@@ -187,7 +180,7 @@ public void randomReadColumn (int keys, int columns, int size, int tps)
String stringKey = new Integer(key).toString();
stringKey = stringKey + keyFix_ ;
int j = random.nextInt(columns) + 1;
- ReadMessage rm = new ReadMessage(tablename_, stringKey, columnFamilyColumn_ + ":" + columnFix_ + j);
+ ReadCommand rm = new ReadCommand(tablename_, stringKey, columnFamilyColumn_ + ":" + columnFix_ + j);
readLoad(rm);
if ( requestsPerSecond_ > 1000)
Thread.sleep(0, 1000000000/requestsPerSecond_);
@@ -257,7 +250,7 @@ public void randomReadSuperColumn(int keys, int superColumns, int columns, int s
stringKey = stringKey + keyFix_ ;
int i = random.nextInt(superColumns) + 1;
int j = random.nextInt(columns) + 1;
- ReadMessage rm = new ReadMessage(tablename_, stringKey, columnFamilySuperColumn_ + ":" + superColumnFix_ + i + ":" + columnFix_ + j);
+ ReadCommand rm = new ReadCommand(tablename_, stringKey, columnFamilySuperColumn_ + ":" + superColumnFix_ + i + ":" + columnFix_ + j);
readLoad(rm);
}
}
View
10 test/org/apache/cassandra/db/ReadMessageTest.java
@@ -16,16 +16,16 @@ public void testMakeReadMessage()
colList.add("col1");
colList.add("col2");
- ReadMessage rm = new ReadMessage("Table1", "row1", "foo", colList);
- ReadMessage rm2 = serializeAndDeserializeReadMessage(rm);
+ ReadCommand rm = new ReadCommand("Table1", "row1", "foo", colList);
+ ReadCommand rm2 = serializeAndDeserializeReadMessage(rm);
assert rm2.toString().equals(rm.toString());
}
- private ReadMessage serializeAndDeserializeReadMessage(ReadMessage rm)
+ private ReadCommand serializeAndDeserializeReadMessage(ReadCommand rm)
{
- ReadMessage rm2 = null;
- ReadMessageSerializer rms = (ReadMessageSerializer) ReadMessage.serializer();
+ ReadCommand rm2 = null;
+ ReadCommandSerializer rms = (ReadCommandSerializer) ReadCommand.serializer();
DataOutputBuffer dos = new DataOutputBuffer();
DataInputBuffer dis = new DataInputBuffer();

0 comments on commit be7a79d

Please sign in to comment.