Browse files

refactor read path: first we make readColumnFamily accept a ReadComma…

…nd, and use that to clean out duplicate code in CassandraServer. Then we clean up the duplicate versions of the read methods in StorageService by making them ReadCommand-based, too. [not touching multiget code for now.]

patch by jbellis; reviewed by Eric Evans for #88
  • Loading branch information...
1 parent ac22da5 commit 4cda99533a2dfd703932bc24cdd2258999cb809c Jonathan Ellis committed Apr 20, 2009
View
5 src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java
@@ -28,6 +28,7 @@
import org.apache.cassandra.cql.execution.RuntimeErrorMsg;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
@@ -99,8 +100,8 @@ public ColumnRangeQueryRSD(CFMetaData cfMetaData, ConstantOperand rowKey, Consta
try
{
String key = (String)(rowKey_.get());
- row = StorageProxy.readProtocol(cfMetaData_.tableName, key, columnFamily_column,
- offset_, limit_, StorageService.ConsistencyLevel.WEAK);
+ ReadCommand readCommand = new ReadCommand(cfMetaData_.tableName, key, columnFamily_column, offset_, limit_);
+ row = StorageProxy.readProtocol(readCommand, StorageService.ConsistencyLevel.WEAK);
}
catch (Exception e)
{
View
5 src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java
@@ -28,6 +28,7 @@
import org.apache.cassandra.cql.execution.RuntimeErrorMsg;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
@@ -68,8 +69,8 @@ public SuperColumnRangeQueryRSD(CFMetaData cfMetaData, OperandDef rowKey, int of
try
{
String key = (String)(rowKey_.get());
- row = StorageProxy.readProtocol(cfMetaData_.tableName, key, cfMetaData_.cfName,
- offset_, limit_, StorageService.ConsistencyLevel.WEAK);
+ ReadCommand readCommand = new ReadCommand(cfMetaData_.tableName, key, cfMetaData_.cfName, offset_, limit_);
+ row = StorageProxy.readProtocol(readCommand, StorageService.ConsistencyLevel.WEAK);
}
catch (Exception e)
{
View
5 src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java
@@ -29,6 +29,7 @@
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.LogUtil;
@@ -84,8 +85,8 @@ public UniqueKeyQueryRSD(CFMetaData cfMetaData, OperandDef rowKey, OperandDef co
try
{
String key = (String)(rowKey_.get());
- row = StorageProxy.readProtocol(cfMetaData_.tableName, key, columnFamily_column, -1,
- Integer.MAX_VALUE, StorageService.ConsistencyLevel.WEAK);
+ ReadCommand readCommand = new ReadCommand(cfMetaData_.tableName, key, columnFamily_column, -1, Integer.MAX_VALUE);
+ row = StorageProxy.readProtocol(readCommand, StorageService.ConsistencyLevel.WEAK);
}
catch (Exception e)
{
View
20 src/org/apache/cassandra/db/ReadCommand.java
@@ -126,6 +126,26 @@ public ReadCommand copy()
return new ReadCommand(table, key, columnFamilyColumn, start, count, sinceTimestamp, columnNames);
}
+ public Row getRow(Table table) throws IOException, ColumnFamilyNotDefinedException
+ {
+ if (columnNames != EMPTY_COLUMNS)
+ {
+ return table.getRow(key, columnFamilyColumn, columnNames);
+ }
+
+ if (sinceTimestamp > 0)
+ {
+ return table.getRow(key, columnFamilyColumn, sinceTimestamp);
+ }
+
+ if (start > 0 || (count > 0 && count < Integer.MAX_VALUE))
+ {
+ return table.getRow(key, columnFamilyColumn, start, count);
+ }
+
+ return table.getRow(key, columnFamilyColumn);
+ }
+
public String toString()
{
return "ReadMessage(" +
View
203 src/org/apache/cassandra/service/CassandraServer.java
@@ -26,6 +26,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeoutException;
import org.apache.log4j.Logger;
@@ -37,6 +38,9 @@
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.ColumnFamilyNotDefinedException;
import org.apache.cassandra.utils.LogUtil;
import org.apache.thrift.TException;
@@ -80,43 +84,26 @@ private void validateTable(String table) throws CassandraException
}
}
- protected ColumnFamily readColumnFamily(String tablename, String key, String columnFamily, List<String> columNames) throws CassandraException, TException
- {
- ColumnFamily cfamily = null;
- try
- {
- validateTable(tablename);
- String[] values = RowMutation.getColumnAndColumnFamily(columnFamily);
- // check for values
- if( values.length < 1 )
- {
- throw new CassandraException("Column Family " + columnFamily + " is invalid.");
- }
- Row row = StorageProxy.readProtocol(tablename, key, columnFamily, columNames, StorageService.ConsistencyLevel.WEAK);
- if (row == null)
- {
- throw new CassandraException("No row exists for key " + key);
- }
- Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
- if (cfMap == null || cfMap.size() == 0)
- {
- logger_ .info("ERROR ColumnFamily " + columnFamily + " map is missing.....: " + " key:" + key );
- throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
- }
- cfamily = cfMap.get(values[0]);
- if (cfamily == null)
- {
- logger_.info("ERROR ColumnFamily " + columnFamily + " is missing.....: " + " key:" + key + " ColumnFamily:" + values[0]);
- throw new CassandraException("Either the key " + key + " is not present or the column family " + values[0] + " is not present.");
- }
- }
- catch (Throwable ex)
- {
- String exception = LogUtil.throwableToString(ex);
- logger_.info( exception );
- throw new CassandraException(exception);
- }
- return cfamily;
+ protected ColumnFamily readColumnFamily(ReadCommand command) throws CassandraException, TException, IOException, ColumnFamilyNotDefinedException, TimeoutException
+ {
+ validateTable(command.table);
+ String[] values = RowMutation.getColumnAndColumnFamily(command.columnFamilyColumn);
+ if( values.length < 1 )
+ {
+ throw new CassandraException("Empty column Family is invalid.");
+ }
+ Table table = Table.open(command.table);
+ if (!table.getColumnFamilies().contains(values[0]))
+ {
+ throw new CassandraException("Column Family " + values[0] + " is invalid.");
+ }
+
+ Row row = StorageProxy.readProtocol(command, StorageService.ConsistencyLevel.WEAK);
+ if (row == null)
+ {
+ return null;
+ }
+ return row.getColumnFamily(values[0]);
}
public List<column_t> thriftifyColumns(Collection<IColumn> columns)
@@ -139,27 +126,8 @@ protected ColumnFamily readColumnFamily(String tablename, String key, String col
long startTime = System.currentTimeMillis();
try
{
- validateTable(tablename);
- String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
- // check for values
- if( values.length < 1 )
- {
- throw new CassandraException("Column Family " + columnFamily_column + " is invalid.");
- }
- Row row = StorageProxy.readProtocol(tablename, key, columnFamily_column, timeStamp, StorageService.ConsistencyLevel.WEAK);
- if (row == null)
- {
- logger_.info("ERROR No row for this key .....: " + key);
- throw new CassandraException("ERROR No row for this key .....: " + key);
- }
-
- Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
- if (cfMap == null || cfMap.size() == 0)
- {
- logger_ .info("ERROR ColumnFamily " + columnFamily_column + " map is missing.....: " + " key:" + key);
- throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
- }
- ColumnFamily cfamily = cfMap.get(values[0]);
+ ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_column, timeStamp));
+ String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
if (cfamily == null)
{
logger_.info("ERROR ColumnFamily " + columnFamily_column + " is missing.....: "+" key:" + key + " ColumnFamily:" + values[0]);
@@ -204,7 +172,7 @@ protected ColumnFamily readColumnFamily(String tablename, String key, String col
try
{
validateTable(tablename);
- ColumnFamily cfamily = readColumnFamily(tablename, key, columnFamily, columnNames);
+ ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily, columnNames));
if (cfamily == null)
{
logger_.info("ERROR ColumnFamily " + columnFamily + " is missing.....: "
@@ -241,27 +209,8 @@ protected ColumnFamily readColumnFamily(String tablename, String key, String col
long startTime = System.currentTimeMillis();
try
{
- validateTable(tablename);
String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
- // check for values
- if( values.length < 1 )
- {
- throw new CassandraException("Column Family " + columnFamily_column + " is invalid.");
- }
- Row row = StorageProxy.readProtocol(tablename, key, columnFamily_column, start, count, StorageService.ConsistencyLevel.WEAK);
- if (row == null)
- {
- logger_.info("ERROR No row for this key .....: " + key);
- throw new CassandraException("ERROR No row for this key .....: " + key);
- }
-
- Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
- if (cfMap == null || cfMap.size() == 0)
- {
- logger_ .info("ERROR ColumnFamily " + columnFamily_column + " map is missing.....: " + " key:" + key);
- throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
- }
- ColumnFamily cfamily = cfMap.get(values[0]);
+ ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_column, start, count));
if (cfamily == null)
{
logger_.info("ERROR ColumnFamily " + columnFamily_column + " is missing.....: " + " key:" + key + " ColumnFamily:" + values[0]);
@@ -303,29 +252,8 @@ public column_t get_column(String tablename, String key, String columnFamily_col
{
try
{
- validateTable(tablename);
String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
- // check for values
- if( values.length < 2 )
- {
- throw new CassandraException("Column Family " + columnFamily_column + " is invalid.");
- }
- Row row = StorageProxy.readProtocol(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE, StorageService.ConsistencyLevel.WEAK);
- if (row == null)
- {
- logger_.info("ERROR No row for this key .....: " + key);
- throw new CassandraException("ERROR No row for this key .....: " + key);
- }
-
- Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
- if (cfMap == null || cfMap.size() == 0)
- {
- logger_ .info("ERROR ColumnFamily map is missing.....: "
- + " key:" + key
- );
- throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
- }
- ColumnFamily cfamily = cfMap.get(values[0]);
+ ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE));
if (cfamily == null)
{
logger_.info("ERROR ColumnFamily is missing.....: "
@@ -375,29 +303,8 @@ public int get_column_count(String tablename, String key, String columnFamily_co
int count = -1;
try
{
- validateTable(tablename);
String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
- // check for values
- if( values.length < 1 )
- {
- throw new CassandraException("Column Family " + columnFamily_column + " is invalid.");
- }
- Row row = StorageProxy.readProtocol(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE, StorageService.ConsistencyLevel.WEAK);
- if (row == null)
- {
- logger_.info("ERROR No row for this key .....: " + key);
- throw new CassandraException("ERROR No row for this key .....: " + key);
- }
-
- Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
- if (cfMap == null || cfMap.size() == 0)
- {
- logger_ .info("ERROR ColumnFamily map is missing.....: "
- + " key:" + key
- );
- throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
- }
- ColumnFamily cfamily = cfMap.get(values[0]);
+ ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE));
if (cfamily == null)
{
logger_.info("ERROR ColumnFamily is missing.....: "
@@ -485,7 +392,7 @@ public boolean remove(String tablename, String key, String columnFamily_column,
try
{
validateTable(tablename);
- ColumnFamily cfamily = readColumnFamily(tablename, key, columnFamily, superColumnNames);
+ ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily, superColumnNames));
if (cfamily == null)
{
logger_.info("ERROR ColumnFamily " + columnFamily + " is missing.....: "+" key:" + key
@@ -533,29 +440,8 @@ public boolean remove(String tablename, String key, String columnFamily_column,
{
try
{
- validateTable(tablename);
String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_superColumnName);
- // check for values
- if( values.length < 1 )
- {
- throw new CassandraException("Column Family " + columnFamily_superColumnName + " is invalid.");
- }
- Row row = StorageProxy.readProtocol(tablename, key, columnFamily_superColumnName, start, count, StorageService.ConsistencyLevel.WEAK);
- if (row == null)
- {
- logger_.info("ERROR No row for this key .....: " + key);
- throw new CassandraException("ERROR No row for this key .....: " + key);
- }
-
- Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
- if (cfMap == null || cfMap.size() == 0)
- {
- logger_ .info("ERROR ColumnFamily map is missing.....: "
- + " key:" + key
- );
- throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
- }
- ColumnFamily cfamily = cfMap.get(values[0]);
+ ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_superColumnName, start, count));
if (cfamily == null)
{
logger_.info("ERROR ColumnFamily is missing.....: "
@@ -584,33 +470,10 @@ public boolean remove(String tablename, String key, String columnFamily_column,
public superColumn_t get_superColumn(String tablename, String key, String columnFamily_column) throws CassandraException
{
- superColumn_t ret = null;
try
{
- validateTable(tablename);
String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
- // check for values
- if( values.length < 2 )
- {
- throw new CassandraException("Column Family " + columnFamily_column + " is invalid.");
- }
-
- Row row = StorageProxy.readProtocol(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE, StorageService.ConsistencyLevel.WEAK);
- if (row == null)
- {
- logger_.info("ERROR No row for this key .....: " + key);
- throw new CassandraException("ERROR No row for this key .....: " + key);
- }
-
- Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
- if (cfMap == null || cfMap.size() == 0)
- {
- logger_ .info("ERROR ColumnFamily map is missing.....: "
- + " key:" + key
- );
- throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
- }
- ColumnFamily cfamily = cfMap.get(values[0]);
+ ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE));
if (cfamily == null)
{
logger_.info("ERROR ColumnFamily is missing.....: "
View
33 src/org/apache/cassandra/service/ConsistencyManager.java
@@ -155,29 +155,30 @@ private void handleResponses()
private long sinceTimestamp_;
private List<String> columnNames_ = new ArrayList<String>();
- ConsistencyManager(Row row, List<EndPoint> replicas, String columnFamily, List<String> columns)
+ public ConsistencyManager(Row row_, List<EndPoint> replicas_, String columnFamily_, int start_, int count_, long sinceTimestamp_, List<String> columnNames_)
+ {
+ this.row_ = row_;
+ this.replicas_ = replicas_;
+ this.columnFamily_ = columnFamily_;
+ this.start_ = start_;
+ this.count_ = count_;
+ this.sinceTimestamp_ = sinceTimestamp_;
+ this.columnNames_ = columnNames_;
+ }
+
+ ConsistencyManager(Row row, List<EndPoint> replicas, String columnFamily, List<String> columns)
{
- row_ = row;
- replicas_ = replicas;
- columnFamily_ = columnFamily;
- columnNames_ = columns;
+ this(row, replicas, columnFamily, 0, 0, 0, columns);
}
-
+
ConsistencyManager(Row row, List<EndPoint> replicas, String columnFamily, int start, int count)
{
- row_ = row;
- replicas_ = replicas;
- columnFamily_ = columnFamily;
- start_ = start;
- count_ = count;
+ this(row, replicas, columnFamily, start, count, 0, null);
}
-
+
ConsistencyManager(Row row, List<EndPoint> replicas, String columnFamily, long sinceTimestamp)
{
- row_ = row;
- replicas_ = replicas;
- columnFamily_ = columnFamily;
- sinceTimestamp_ = sinceTimestamp;
+ this(row, replicas, columnFamily, 0, 0, sinceTimestamp, null);
}
public void run()
View
484 src/org/apache/cassandra/service/StorageProxy.java
@@ -36,6 +36,7 @@
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.TouchMessage;
+import org.apache.cassandra.db.ColumnFamilyNotDefinedException;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.IAsyncResult;
@@ -203,28 +204,37 @@ private static IAsyncResult dispatchMessages(Map<String, EndPoint> endPoints, Ma
}
return rows;
}
-
- public static Row doReadProtocol(String key, ReadCommand readCommand) throws IOException,TimeoutException
+
+ /**
+ * Read the data from one replica. If there is no reply, read the data from another. In the event we get
+ * the data we perform consistency checks and figure out if any repairs need to be done to the replicas.
+ * @param command the read to perform
+ * @return the row associated with command.key
+ * @throws Exception
+ */
+ private static Row weakReadRemote(ReadCommand command) throws IOException
{
- Row row = null;
- EndPoint endPoint = StorageService.instance().findSuitableEndPoint(key);
- if(endPoint != null)
+ EndPoint endPoint = StorageService.instance().findSuitableEndPoint(command.key);
+ assert endPoint != null;
+ logger_.debug("weakreadremote reading " + command + " from " + endPoint);
+ Message message = command.makeReadMessage();
+ message.addHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
+ IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
+ byte[] body;
+ try
{
- Message message = readCommand.makeReadMessage();
- message.addHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
- IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
Object[] result = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
- byte[] body = (byte[])result[0];
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(body, body.length);
- ReadResponse response = ReadResponse.serializer().deserialize(bufIn);
- row = response.row();
+ body = (byte[])result[0];
}
- else
+ catch (TimeoutException e)
{
- logger_.warn(" Alert : Unable to find a suitable end point for the key : " + key );
+ throw new RuntimeException(e);
+ // TODO retry to a different endpoint
}
- return row;
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(body, body.length);
+ ReadResponse response = ReadResponse.serializer().deserialize(bufIn);
+ return response.row();
}
static void touch_local(String tablename, String key, boolean fData ) throws IOException
@@ -292,85 +302,41 @@ public static void touchProtocol(String tablename, String key, boolean fData, St
break;
}
}
-
- public static Row readProtocol(String tablename, String key, String columnFamily, List<String> columnNames, StorageService.ConsistencyLevel consistencyLevel) throws Exception
+
+ /**
+ * Performs the actual reading of a row out of the StorageService, fetching
+ * a specific set of column names from a given column family.
+ */
+ public static Row readProtocol(ReadCommand command, StorageService.ConsistencyLevel consistencyLevel)
+ throws IOException, ColumnFamilyNotDefinedException, TimeoutException
{
+ assert command.key != null;
+ long startTime = System.currentTimeMillis();
Row row = null;
- boolean foundLocal = false;
- EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(key);
- for(EndPoint endPoint: endpoints)
- {
- if(endPoint.equals(StorageService.getLocalStorageEndPoint()))
- {
- foundLocal = true;
- break;
- }
- }
- if(!foundLocal && consistencyLevel == StorageService.ConsistencyLevel.WEAK)
- {
- ReadCommand readCommand = null;
- readCommand = new ReadCommand(tablename, key, columnFamily, columnNames);
- return doReadProtocol(key, readCommand);
- }
- else
+ EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(command.key);
+
+ if (consistencyLevel == StorageService.ConsistencyLevel.WEAK)
{
- switch ( consistencyLevel )
+ boolean foundLocal = Arrays.asList(endpoints).contains(StorageService.getLocalStorageEndPoint());
+ if (foundLocal)
{
- case WEAK:
- row = weakReadProtocol(tablename, key, columnFamily, columnNames);
- break;
-
- case STRONG:
- row = strongReadProtocol(tablename, key, columnFamily, columnNames);
- break;
-
- default:
- row = weakReadProtocol(tablename, key, columnFamily, columnNames);
- break;
+ row = weakReadLocal(command);
}
- }
- return row;
- }
-
- public static Row readProtocol(String tablename, String key, String columnFamily, int start, int count, StorageService.ConsistencyLevel consistencyLevel) throws Exception
- {
- Row row = null;
- boolean foundLocal = false;
- EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(key);
- for(EndPoint endPoint: endpoints)
- {
- if(endPoint.equals(StorageService.getLocalStorageEndPoint()))
+ else
{
- foundLocal = true;
- break;
+ row = weakReadRemote(command);
}
- }
- if(!foundLocal && consistencyLevel == StorageService.ConsistencyLevel.WEAK)
- {
- ReadCommand readCommand = null;
- readCommand = new ReadCommand(tablename, key, columnFamily, start, count);
- return doReadProtocol(key, readCommand);
}
else
{
- switch ( consistencyLevel )
- {
- case WEAK:
- row = weakReadProtocol(tablename, key, columnFamily, start, count);
- break;
-
- case STRONG:
- row = strongReadProtocol(tablename, key, columnFamily, start, count);
- break;
-
- default:
- row = weakReadProtocol(tablename, key, columnFamily, start, count);
- break;
- }
+ assert consistencyLevel == StorageService.ConsistencyLevel.STRONG;
+ row = strongRead(command);
}
+
+ logger_.debug("Finished reading " + row + " in " + (System.currentTimeMillis() - startTime) + " ms.");
return row;
}
-
+
public static Map<String, Row> readProtocol(String tablename, String[] keys, String columnFamily, int start, int count, StorageService.ConsistencyLevel consistencyLevel) throws Exception
{
Map<String, Row> rows = new HashMap<String, Row>();
@@ -390,105 +356,7 @@ public static Row readProtocol(String tablename, String key, String columnFamily
}
return rows;
}
-
- public static Row readProtocol(String tablename, String key, String columnFamily, long sinceTimestamp, StorageService.ConsistencyLevel consistencyLevel) throws Exception
- {
- Row row = null;
- boolean foundLocal = false;
- EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(key);
- for(EndPoint endPoint: endpoints)
- {
- if(endPoint.equals(StorageService.getLocalStorageEndPoint()))
- {
- foundLocal = true;
- break;
- }
- }
- if(!foundLocal && consistencyLevel == StorageService.ConsistencyLevel.WEAK)
- {
- ReadCommand readCommand = null;
- readCommand = new ReadCommand(tablename, key, columnFamily, sinceTimestamp);
- return doReadProtocol(key, readCommand);
- }
- else
- {
- switch ( consistencyLevel )
- {
- case WEAK:
- row = weakReadProtocol(tablename, key, columnFamily, sinceTimestamp);
- break;
-
- case STRONG:
- row = strongReadProtocol(tablename, key, columnFamily, sinceTimestamp);
- break;
-
- default:
- row = weakReadProtocol(tablename, key, columnFamily, sinceTimestamp);
- break;
- }
- }
- return row;
- }
- public static Row strongReadProtocol(String tablename, String key, String columnFamily, List<String> columns) throws Exception
- {
- long startTime = System.currentTimeMillis();
- // TODO: throw a thrift exception if we do not have N nodes
- ReadCommand readCommand = new ReadCommand(tablename, key, columnFamily, columns);
-
- ReadCommand readCommandDigestOnly = new ReadCommand(tablename, key, columnFamily, columns);
- readCommandDigestOnly.setDigestQuery(true);
-
- Row row = StorageProxy.doStrongReadProtocol(key, readCommand, readCommandDigestOnly);
- logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
- return row;
- }
-
- /**
- * This function executes the read protocol.
- * 1. Get the N nodes from storage service where the data needs to be replicated
- * 2. Construct a message for read\write
- * 3. Set one of teh messages to get teh data and teh rest to get teh digest
- * 4. SendRR ( to all the nodes above )
- * 5. Wait for a response from atleast X nodes where X <= N and teh data node
- * 6. If the digest matches return teh data.
- * 7. else carry out read repair by getting data from all the nodes.
- * @param tablename the name of the table
- * @param key the row key identifier
- * @param columnFamily the column in Cassandra format
- * @start the start position
- * @count the number of columns we are interested in
- * @throws IOException, TimeoutException
- */
- public static Row strongReadProtocol(String tablename, String key, String columnFamily, int start, int count) throws IOException, TimeoutException
- {
- long startTime = System.currentTimeMillis();
- // TODO: throw a thrift exception if we do not have N nodes
- ReadCommand readCommand = null;
- ReadCommand readCommandDigestOnly = null;
- if( start >= 0 && count < Integer.MAX_VALUE)
- {
- readCommand = new ReadCommand(tablename, key, columnFamily, start, count);
- }
- else
- {
- readCommand = new ReadCommand(tablename, key, columnFamily);
- }
- Message message = readCommand.makeReadMessage();
- if( start >= 0 && count < Integer.MAX_VALUE)
- {
- readCommandDigestOnly = new ReadCommand(tablename, key, columnFamily, start, count);
- }
- else
- {
- readCommandDigestOnly = new ReadCommand(tablename, key, columnFamily);
- }
- readCommandDigestOnly.setDigestQuery(true);
- Row row = doStrongReadProtocol(key, readCommand, readCommandDigestOnly);
- logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
- return row;
- }
-
/**
* This is a multiget version of the above method.
* @param tablename
@@ -531,101 +399,91 @@ public static Row strongReadProtocol(String tablename, String key, String column
logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
return rows;
}
-
- public static Row strongReadProtocol(String tablename, String key, String columnFamily, long sinceTimestamp) throws IOException, TimeoutException
- {
- long startTime = System.currentTimeMillis();
- // TODO: throw a thrift exception if we do not have N nodes
- ReadCommand readCommand = null;
- ReadCommand readCommandDigestOnly = null;
- readCommand = new ReadCommand(tablename, key, columnFamily, sinceTimestamp);
- Message message = readCommand.makeReadMessage();
- readCommandDigestOnly = new ReadCommand(tablename, key, columnFamily, sinceTimestamp);
- readCommandDigestOnly.setDigestQuery(true);
- Row row = doStrongReadProtocol(key, readCommand, readCommandDigestOnly);
- logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
- return row;
- }
- /**
- * This method performs the read from the replicas.
- * param @ key - key for which the data is required.
- * param @ readMessage - the read message to get the actual data
- * param @ readMessageDigest - the read message to get the digest.
- */
- private static Row doStrongReadProtocol(String key, ReadCommand readCommand, ReadCommand readCommandDigest) throws IOException, TimeoutException
+ /*
+ * This function executes the read protocol.
+ // 1. Get the N nodes from storage service where the data needs to be
+ // replicated
+ // 2. Construct a message for read\write
+ * 3. Set one of teh messages to get teh data and teh rest to get teh digest
+ // 4. SendRR ( to all the nodes above )
+ // 5. Wait for a response from atleast X nodes where X <= N and teh data node
+ * 6. If the digest matches return teh data.
+ * 7. else carry out read repair by getting data from all the nodes.
+ // 5. return success
+ */
+ private static Row strongRead(ReadCommand command) throws IOException, TimeoutException
{
+ // TODO: throw a thrift exception if we do not have N nodes
+
+ ReadCommand readMessageDigestOnly = command.copy();
+ readMessageDigestOnly.setDigestQuery(true);
+
Row row = null;
- Message message = readCommand.makeReadMessage();
- Message messageDigestOnly = readCommandDigest.makeReadMessage();
-
+ Message message = command.makeReadMessage();
+ Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
+
IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(
DatabaseDescriptor.getReplicationFactor(),
readResponseResolver);
- EndPoint dataPoint = StorageService.instance().findSuitableEndPoint(key);
- List<EndPoint> endpointList = new ArrayList<EndPoint>( Arrays.asList( StorageService.instance().getNStorageEndPoint(key) ) );
- /* Remove the local storage endpoint from the list. */
- endpointList.remove( dataPoint );
+ EndPoint dataPoint = StorageService.instance().findSuitableEndPoint(command.key);
+ List<EndPoint> endpointList = new ArrayList<EndPoint>(Arrays.asList(StorageService.instance().getNStorageEndPoint(command.key)));
+ /* Remove the local storage endpoint from the list. */
+ endpointList.remove(dataPoint);
EndPoint[] endPoints = new EndPoint[endpointList.size() + 1];
Message messages[] = new Message[endpointList.size() + 1];
-
- /*
+
+ /*
* First message is sent to the node that will actually get
- * the data for us. The other two replicas are only sent a
+ * the data for us. The other two replicas are only sent a
* digest query.
*/
endPoints[0] = dataPoint;
- messages[0] = message;
- for (int i=1; i < endPoints.length ; i++)
+ messages[0] = message;
+ for (int i = 1; i < endPoints.length; i++)
{
- endPoints[i] = endpointList.get(i-1);
+ endPoints[i] = endpointList.get(i - 1);
messages[i] = messageDigestOnly;
}
-
+ logger_.debug("strongread reading " + command + " from " + StringUtils.join(endPoints, ", "));
+
try
{
- MessagingService.getMessagingInstance().sendRR(messages, endPoints, quorumResponseHandler);
+ MessagingService.getMessagingInstance().sendRR(messages, endPoints, quorumResponseHandler);
+
long startTime2 = System.currentTimeMillis();
row = quorumResponseHandler.get();
- logger_.debug("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2)
- + " ms.");
- if (row == null)
- {
- logger_.info("ERROR No row for this key .....: " + key);
- // TODO: throw a thrift exception
- return row;
- }
+ logger_.debug("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2) + " ms.");
}
catch (DigestMismatchException ex)
{
if ( DatabaseDescriptor.getConsistencyCheck())
{
- IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver();
- QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
- DatabaseDescriptor.getReplicationFactor(),
- readResponseResolverRepair);
- readCommand.setDigestQuery(false);
- logger_.info("DigestMismatchException: " + key);
- Message messageRepair = readCommand.makeReadMessage();
- MessagingService.getMessagingInstance().sendRR(messageRepair, endPoints, quorumResponseHandlerRepair);
- try
- {
- row = quorumResponseHandlerRepair.get();
- }
- catch(DigestMismatchException dex)
- {
- logger_.warn(LogUtil.throwableToString(dex));
- }
- if (row == null)
- {
- logger_.info("ERROR No row for this key .....: " + key);
- }
+ IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver();
+ QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
+ DatabaseDescriptor.getReplicationFactor(),
+ readResponseResolverRepair);
+ command.setDigestQuery(false);
+ logger_.info("DigestMismatchException: " + command.key);
+ Message messageRepair = command.makeReadMessage();
+ MessagingService.getMessagingInstance().sendRR(messageRepair, endPoints,
+ quorumResponseHandlerRepair);
+ try
+ {
+ row = quorumResponseHandlerRepair.get();
+ }
+ catch (DigestMismatchException e)
+ {
+ // TODO should this be a thrift exception?
+ throw new RuntimeException(e);
+ }
}
- }
+ }
+
return row;
}
-
+
private static Map<String, Message[]> constructReplicaMessages(Map<String, ReadCommand[]> readMessages) throws IOException
{
Map<String, Message[]> messages = new HashMap<String, Message[]>();
@@ -720,38 +578,7 @@ private static MultiQuorumResponseHandler dispatchMessages(Map<String, ReadComma
}
return rows;
}
-
- /**
- * This version is used to retrieve the row associated with
- * the specified key
- * @param tablename name of the table that needs to be queried
- * @param keys keys whose values we are interested in
- * @param columnFamily name of the "column" we are interested in
- * @param columns the columns we are interested in
- * @return the interested row
- * @throws Exception
- */
- public static Row weakReadProtocol(String tablename, String key, String columnFamily, List<String> columns) throws Exception
- {
- long startTime = System.currentTimeMillis();
- List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
- /* Remove the local storage endpoint from the list. */
- endpoints.remove( StorageService.getLocalStorageEndPoint() );
- // TODO: throw a thrift exception if we do not have N nodes
-
- Table table = Table.open( DatabaseDescriptor.getTables().get(0) );
- Row row = table.getRow(key, columnFamily, columns);
-
- logger_.debug("Local Read Protocol: " + (System.currentTimeMillis() - startTime) + " ms.");
- /*
- * Do the consistency checks in the background and return the
- * non NULL row.
- */
- if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
- StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, columns);
- return row;
- }
-
+
/**
* This version is used when results for multiple keys needs to be
* retrieved.
@@ -782,56 +609,40 @@ public static Row weakReadProtocol(String tablename, String key, String columnFa
for ( String key : keys )
{
List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
- /* Remove the local storage endpoint from the list. */
+ /* Remove the local storage endpoint from the list. */
endpoints.remove( StorageService.getLocalStorageEndPoint() );
if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, columns);
}
- return rows;
+ return rows;
}
-
- /**
- * This function executes the read protocol locally and should be used only if consistency is not a concern.
- * Read the data from the local disk and return if the row is NOT NULL. If the data is NULL do the read from
- * one of the other replicas (in the same data center if possible) till we get the data. In the event we get
- * the data we perform consistency checks and figure out if any repairs need to be done to the replicas.
- * @param tablename name of the table that needs to be queried
- * @param key key whose we are interested in
- * @param columnFamily name of the "column" we are interested in
- * @param start start index
- * @param count the number of columns we are interested in
- * @return the row associated with this key
- * @throws Exception
- */
- public static Row weakReadProtocol(String tablename, String key, String columnFamily, int start, int count) throws Exception
+
+ /*
+ * This function executes the read protocol locally and should be used only if consistency is not a concern.
+ * Read the data from the local disk and return if the row is NOT NULL. If the data is NULL do the read from
+ * one of the other replicas (in the same data center if possible) till we get the data. In the event we get
+ * the data we perform consistency checks and figure out if any repairs need to be done to the replicas.
+ */
+ private static Row weakReadLocal(ReadCommand command) throws IOException, ColumnFamilyNotDefinedException
{
- Row row = null;
- long startTime = System.currentTimeMillis();
- List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
- /* Remove the local storage endpoint from the list. */
- endpoints.remove( StorageService.getLocalStorageEndPoint() );
+ logger_.debug("weakreadlocal for " + command);
+ List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(command.key);
+ /* Remove the local storage endpoint from the list. */
+ endpoints.remove(StorageService.getLocalStorageEndPoint());
// TODO: throw a thrift exception if we do not have N nodes
-
- Table table = Table.open( DatabaseDescriptor.getTables().get(0) );
- if( start >= 0 && count < Integer.MAX_VALUE)
- {
- row = table.getRow(key, columnFamily, start, count);
- }
- else
- {
- row = table.getRow(key, columnFamily);
- }
-
- logger_.debug("Local Read Protocol: " + (System.currentTimeMillis() - startTime) + " ms.");
+
+ Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+ Row row = command.getRow(table);
+
/*
- * Do the consistency checks in the background and return the
- * non NULL row.
- */
- if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
- StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, start, count);
- return row;
+ * Do the consistency checks in the background and return the
+ * non NULL row.
+ */
+ if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+ StorageService.instance().doConsistencyCheck(row, endpoints, command);
+ return row;
}
-
+
/**
* This version is used when results for multiple keys needs to be
* retrieved.
@@ -870,38 +681,7 @@ public static Row weakReadProtocol(String tablename, String key, String columnFa
}
return rows;
}
-
- /**
- * This version is used when retrieving a single key.
- *
- * @param tablename name of the table that needs to be queried
- * @param key key whose we are interested in
- * @param columnFamily name of the "column" we are interested in
- * @param sinceTimestamp this is lower bound of the timestamp
- * @return the row associated with this key
- * @throws Exception
- */
- public static Row weakReadProtocol(String tablename, String key, String columnFamily, long sinceTimestamp) throws Exception
- {
- Row row = null;
- long startTime = System.currentTimeMillis();
- List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
- /* Remove the local storage endpoint from the list. */
- endpoints.remove( StorageService.getLocalStorageEndPoint() );
- // TODO: throw a thrift exception if we do not have N nodes
-
- Table table = Table.open( DatabaseDescriptor.getTables().get(0) );
- row = table.getRow(key, columnFamily,sinceTimestamp);
- logger_.debug("Local Read Protocol: " + (System.currentTimeMillis() - startTime) + " ms.");
- /*
- * Do the consistency checks in the background and return the
- * non NULL row.
- */
- if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
- StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, sinceTimestamp);
- return row;
- }
-
+
/**
* This version is used when results for multiple keys needs to be
* retrieved.
View
12 src/org/apache/cassandra/service/StorageService.java
@@ -565,18 +565,28 @@ public boolean isInSameDataCenter(EndPoint endpoint) throws IOException
* sure that the N replicas are in sync. We do this in the
* background when we do not care much about consistency.
*/
+ public void doConsistencyCheck(Row row, List<EndPoint> endpoints, ReadCommand message)
+ {
+ Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, message.columnFamilyColumn,
+ message.start, message.count, message.sinceTimestamp, message.columnNames);
+ consistencyManager_.submit(consistencySentinel);
+ }
+
+ @Deprecated
public void doConsistencyCheck(Row row, List<EndPoint> endpoints, String columnFamily, int start, int count)
{
Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, columnFamily, start, count);
consistencyManager_.submit(consistencySentinel);
}
-
+
+ @Deprecated
public void doConsistencyCheck(Row row, List<EndPoint> endpoints, String columnFamily, long sinceTimestamp)
{
Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, columnFamily, sinceTimestamp);
consistencyManager_.submit(consistencySentinel);
}
+ @Deprecated
public void doConsistencyCheck(Row row, List<EndPoint> endpoints, String columnFamily, List<String> columns)
{
Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, columnFamily, columns);

0 comments on commit 4cda995

Please sign in to comment.