diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift index ff0eb4c..4f8b661 100644 --- a/interface/cassandra.thrift +++ b/interface/cassandra.thrift @@ -60,7 +60,7 @@ service Cassandra { async void insert(string tablename,string key,string columnFamily_column, string cellData,i64 timestamp), async void batch_insert(batch_mutation_t batchMutation), bool batch_insert_blocking(batch_mutation_t batchMutation) throws (1:CassandraException e), - async void remove(string tablename,string key,string columnFamily_column), + bool remove(string tablename,string key,string columnFamily_column, i64 timestamp, bool block), list get_columns_since(string tablename, string key, string columnFamily_column, i64 timeStamp) throws (1:CassandraException e), list get_slice_super(string tablename, string key, string columnFamily_superColumnName, i32 start = -1 , i32 count = -1) throws (1:CassandraException e), diff --git a/src/org/apache/cassandra/service/Cassandra.java b/src/org/apache/cassandra/service/Cassandra.java index b79f169..08ef4bd 100644 --- a/src/org/apache/cassandra/service/Cassandra.java +++ b/src/org/apache/cassandra/service/Cassandra.java @@ -36,7 +36,7 @@ public interface Iface { public boolean batch_insert_blocking(batch_mutation_t batchMutation) throws CassandraException, TException; - public void remove(String tablename, String key, String columnFamily_column) throws TException; + public boolean remove(String tablename, String key, String columnFamily_column, long timestamp, boolean block) throws TException; public List get_columns_since(String tablename, String key, String columnFamily_column, long timeStamp) throws CassandraException, TException; @@ -314,23 +314,43 @@ public boolean recv_batch_insert_blocking() throws CassandraException, TExceptio throw new TApplicationException(TApplicationException.MISSING_RESULT, "batch_insert_blocking failed: unknown result"); } - public void remove(String tablename, String key, String columnFamily_column) throws TException + public boolean remove(String tablename, String key, String columnFamily_column, long timestamp, boolean block) throws TException { - send_remove(tablename, key, columnFamily_column); + send_remove(tablename, key, columnFamily_column, timestamp, block); + return recv_remove(); } - public void send_remove(String tablename, String key, String columnFamily_column) throws TException + public void send_remove(String tablename, String key, String columnFamily_column, long timestamp, boolean block) throws TException { oprot_.writeMessageBegin(new TMessage("remove", TMessageType.CALL, seqid_)); remove_args args = new remove_args(); args.tablename = tablename; args.key = key; args.columnFamily_column = columnFamily_column; + args.timestamp = timestamp; + args.block = block; args.write(oprot_); oprot_.writeMessageEnd(); oprot_.getTransport().flush(); } + public boolean recv_remove() throws TException + { + TMessage msg = iprot_.readMessageBegin(); + if (msg.type == TMessageType.EXCEPTION) { + TApplicationException x = TApplicationException.read(iprot_); + iprot_.readMessageEnd(); + throw x; + } + remove_result result = new remove_result(); + result.read(iprot_); + iprot_.readMessageEnd(); + if (result.__isset.success) { + return result.success; + } + throw new TApplicationException(TApplicationException.MISSING_RESULT, "remove failed: unknown result"); + } + public List get_columns_since(String tablename, String key, String columnFamily_column, long timeStamp) throws CassandraException, TException { send_get_columns_since(tablename, key, columnFamily_column, timeStamp); @@ -875,9 +895,15 @@ public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TExcepti remove_args args = new remove_args(); args.read(iprot); iprot.readMessageEnd(); - iface_.remove(args.tablename, args.key, args.columnFamily_column); - return; + remove_result result = new remove_result(); + result.success = iface_.remove(args.tablename, args.key, args.columnFamily_column, args.timestamp, args.block); + result.__isset.success = true; + oprot.writeMessageBegin(new TMessage("remove", TMessageType.REPLY, seqid)); + result.write(oprot); + oprot.writeMessageEnd(); + oprot.getTransport().flush(); } + } private class get_columns_since implements ProcessFunction { @@ -4953,6 +4979,8 @@ public static class remove_args implements TBase, java.io.Serializable, Cloneabl private static final TField TABLENAME_FIELD_DESC = new TField("tablename", TType.STRING, (short)-1); private static final TField KEY_FIELD_DESC = new TField("key", TType.STRING, (short)-2); private static final TField COLUMN_FAMILY_COLUMN_FIELD_DESC = new TField("columnFamily_column", TType.STRING, (short)-3); + private static final TField TIMESTAMP_FIELD_DESC = new TField("timestamp", TType.I64, (short)-4); + private static final TField BLOCK_FIELD_DESC = new TField("block", TType.BOOL, (short)-5); public String tablename; public static final int TABLENAME = -1; @@ -4960,12 +4988,18 @@ public static class remove_args implements TBase, java.io.Serializable, Cloneabl public static final int KEY = -2; public String columnFamily_column; public static final int COLUMNFAMILY_COLUMN = -3; + public long timestamp; + public static final int TIMESTAMP = -4; + public boolean block; + public static final int BLOCK = -5; private final Isset __isset = new Isset(); private static final class Isset implements java.io.Serializable { public boolean tablename = false; public boolean key = false; public boolean columnFamily_column = false; + public boolean timestamp = false; + public boolean block = false; } public static final Map metaDataMap = Collections.unmodifiableMap(new HashMap() {{ @@ -4975,6 +5009,10 @@ private static final class Isset implements java.io.Serializable { new FieldValueMetaData(TType.STRING))); put(COLUMNFAMILY_COLUMN, new FieldMetaData("columnFamily_column", TFieldRequirementType.DEFAULT, new FieldValueMetaData(TType.STRING))); + put(TIMESTAMP, new FieldMetaData("timestamp", TFieldRequirementType.DEFAULT, + new FieldValueMetaData(TType.I64))); + put(BLOCK, new FieldMetaData("block", TFieldRequirementType.DEFAULT, + new FieldValueMetaData(TType.BOOL))); }}); static { @@ -4987,7 +5025,9 @@ public remove_args() { public remove_args( String tablename, String key, - String columnFamily_column) + String columnFamily_column, + long timestamp, + boolean block) { this(); this.tablename = tablename; @@ -4996,6 +5036,10 @@ public remove_args( this.__isset.key = (key != null); this.columnFamily_column = columnFamily_column; this.__isset.columnFamily_column = (columnFamily_column != null); + this.timestamp = timestamp; + this.__isset.timestamp = true; + this.block = block; + this.__isset.block = true; } /** @@ -5014,6 +5058,10 @@ public remove_args(remove_args other) { if (other.columnFamily_column != null) { this.columnFamily_column = other.columnFamily_column; } + __isset.timestamp = other.__isset.timestamp; + this.timestamp = other.timestamp; + __isset.block = other.__isset.block; + this.block = other.block; } @Override @@ -5087,6 +5135,50 @@ public void setColumnFamily_columnIsSet(boolean value) { this.__isset.columnFamily_column = value; } + public long getTimestamp() { + return this.timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + this.__isset.timestamp = true; + } + + public void unsetTimestamp() { + this.__isset.timestamp = false; + } + + // Returns true if field timestamp is set (has been asigned a value) and false otherwise + public boolean isSetTimestamp() { + return this.__isset.timestamp; + } + + public void setTimestampIsSet(boolean value) { + this.__isset.timestamp = value; + } + + public boolean isBlock() { + return this.block; + } + + public void setBlock(boolean block) { + this.block = block; + this.__isset.block = true; + } + + public void unsetBlock() { + this.__isset.block = false; + } + + // Returns true if field block is set (has been asigned a value) and false otherwise + public boolean isSetBlock() { + return this.__isset.block; + } + + public void setBlockIsSet(boolean value) { + this.__isset.block = value; + } + public void setFieldValue(int fieldID, Object value) { switch (fieldID) { case TABLENAME: @@ -5101,6 +5193,14 @@ public void setFieldValue(int fieldID, Object value) { setColumnFamily_column((String)value); break; + case TIMESTAMP: + setTimestamp((Long)value); + break; + + case BLOCK: + setBlock((Boolean)value); + break; + default: throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!"); } @@ -5117,6 +5217,12 @@ public Object getFieldValue(int fieldID) { case COLUMNFAMILY_COLUMN: return getColumnFamily_column(); + case TIMESTAMP: + return new Long(getTimestamp()); + + case BLOCK: + return new Boolean(isBlock()); + default: throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!"); } @@ -5131,6 +5237,10 @@ public boolean isSet(int fieldID) { return this.__isset.key; case COLUMNFAMILY_COLUMN: return this.__isset.columnFamily_column; + case TIMESTAMP: + return this.__isset.timestamp; + case BLOCK: + return this.__isset.block; default: throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!"); } @@ -5176,6 +5286,24 @@ public boolean equals(remove_args that) { return false; } + boolean this_present_timestamp = true; + boolean that_present_timestamp = true; + if (this_present_timestamp || that_present_timestamp) { + if (!(this_present_timestamp && that_present_timestamp)) + return false; + if (this.timestamp != that.timestamp) + return false; + } + + boolean this_present_block = true; + boolean that_present_block = true; + if (this_present_block || that_present_block) { + if (!(this_present_block && that_present_block)) + return false; + if (this.block != that.block) + return false; + } + return true; } @@ -5219,6 +5347,22 @@ public void read(TProtocol iprot) throws TException { TProtocolUtil.skip(iprot, field.type); } break; + case TIMESTAMP: + if (field.type == TType.I64) { + this.timestamp = iprot.readI64(); + this.__isset.timestamp = true; + } else { + TProtocolUtil.skip(iprot, field.type); + } + break; + case BLOCK: + if (field.type == TType.BOOL) { + this.block = iprot.readBool(); + this.__isset.block = true; + } else { + TProtocolUtil.skip(iprot, field.type); + } + break; default: TProtocolUtil.skip(iprot, field.type); break; @@ -5251,6 +5395,12 @@ public void write(TProtocol oprot) throws TException { oprot.writeString(this.columnFamily_column); oprot.writeFieldEnd(); } + oprot.writeFieldBegin(TIMESTAMP_FIELD_DESC); + oprot.writeI64(this.timestamp); + oprot.writeFieldEnd(); + oprot.writeFieldBegin(BLOCK_FIELD_DESC); + oprot.writeBool(this.block); + oprot.writeFieldEnd(); oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -5272,6 +5422,206 @@ public String toString() { sb.append("columnFamily_column:"); sb.append(this.columnFamily_column); first = false; + if (!first) sb.append(", "); + sb.append("timestamp:"); + sb.append(this.timestamp); + first = false; + if (!first) sb.append(", "); + sb.append("block:"); + sb.append(this.block); + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws TException { + // check for required fields + // check that fields of type enum have valid values + } + + } + + public static class remove_result implements TBase, java.io.Serializable, Cloneable { + private static final TStruct STRUCT_DESC = new TStruct("remove_result"); + private static final TField SUCCESS_FIELD_DESC = new TField("success", TType.BOOL, (short)0); + + public boolean success; + public static final int SUCCESS = 0; + + private final Isset __isset = new Isset(); + private static final class Isset implements java.io.Serializable { + public boolean success = false; + } + + public static final Map metaDataMap = Collections.unmodifiableMap(new HashMap() {{ + put(SUCCESS, new FieldMetaData("success", TFieldRequirementType.DEFAULT, + new FieldValueMetaData(TType.BOOL))); + }}); + + static { + FieldMetaData.addStructMetaDataMap(remove_result.class, metaDataMap); + } + + public remove_result() { + } + + public remove_result( + boolean success) + { + this(); + this.success = success; + this.__isset.success = true; + } + + /** + * Performs a deep copy on other. + */ + public remove_result(remove_result other) { + __isset.success = other.__isset.success; + this.success = other.success; + } + + @Override + public remove_result clone() { + return new remove_result(this); + } + + public boolean isSuccess() { + return this.success; + } + + public void setSuccess(boolean success) { + this.success = success; + this.__isset.success = true; + } + + public void unsetSuccess() { + this.__isset.success = false; + } + + // Returns true if field success is set (has been asigned a value) and false otherwise + public boolean isSetSuccess() { + return this.__isset.success; + } + + public void setSuccessIsSet(boolean value) { + this.__isset.success = value; + } + + public void setFieldValue(int fieldID, Object value) { + switch (fieldID) { + case SUCCESS: + setSuccess((Boolean)value); + break; + + default: + throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!"); + } + } + + public Object getFieldValue(int fieldID) { + switch (fieldID) { + case SUCCESS: + return new Boolean(isSuccess()); + + default: + throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!"); + } + } + + // Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise + public boolean isSet(int fieldID) { + switch (fieldID) { + case SUCCESS: + return this.__isset.success; + default: + throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!"); + } + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof remove_result) + return this.equals((remove_result)that); + return false; + } + + public boolean equals(remove_result that) { + if (that == null) + return false; + + boolean this_present_success = true; + boolean that_present_success = true; + if (this_present_success || that_present_success) { + if (!(this_present_success && that_present_success)) + return false; + if (this.success != that.success) + return false; + } + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + public void read(TProtocol iprot) throws TException { + TField field; + iprot.readStructBegin(); + while (true) + { + field = iprot.readFieldBegin(); + if (field.type == TType.STOP) { + break; + } + switch (field.id) + { + case SUCCESS: + if (field.type == TType.BOOL) { + this.success = iprot.readBool(); + this.__isset.success = true; + } else { + TProtocolUtil.skip(iprot, field.type); + } + break; + default: + TProtocolUtil.skip(iprot, field.type); + break; + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + + // check for required fields of primitive type, which can't be checked in the validate method + validate(); + } + + public void write(TProtocol oprot) throws TException { + oprot.writeStructBegin(STRUCT_DESC); + + if (this.__isset.success) { + oprot.writeFieldBegin(SUCCESS_FIELD_DESC); + oprot.writeBool(this.success); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("remove_result("); + boolean first = true; + + if (!first) sb.append(", "); + sb.append("success:"); + sb.append(this.success); + first = false; sb.append(")"); return sb.toString(); } diff --git a/src/org/apache/cassandra/service/CassandraServer.java b/src/org/apache/cassandra/service/CassandraServer.java index 9bea0d5..84683ca 100644 --- a/src/org/apache/cassandra/service/CassandraServer.java +++ b/src/org/apache/cassandra/service/CassandraServer.java @@ -491,17 +491,12 @@ public void batch_insert(batch_mutation_t batchMutation) StorageProxy.insert(rm); } - public void remove(String tablename, String key, String columnFamily_column) - { - throw new UnsupportedOperationException("Remove is coming soon"); - } - - public boolean remove(String tablename, String key, String columnFamily_column, long timestamp, int block_for) + public boolean remove(String tablename, String key, String columnFamily_column, long timestamp, boolean block) { logger_.debug("remove"); RowMutation rm = new RowMutation(tablename, key.trim()); rm.delete(columnFamily_column, timestamp); - if (block_for > 0) { + if (block) { return StorageProxy.insertBlocking(rm); } else { StorageProxy.insert(rm); diff --git a/src/org/apache/cassandra/test/DataImporter.java b/src/org/apache/cassandra/test/DataImporter.java index bb079bd..4dfabe8 100644 --- a/src/org/apache/cassandra/test/DataImporter.java +++ b/src/org/apache/cassandra/test/DataImporter.java @@ -1345,85 +1345,6 @@ public void applyLoad(RowMutation rm) throws IOException { } - - - public void testRemove(String filepath) throws Throwable { - BufferedReader bufReader = new BufferedReader(new InputStreamReader( - new FileInputStream(filepath)), 16 * 1024 * 1024); - String line = null; - String delimiter_ = new String(","); - RowMutationMessage rmInbox = null; - RowMutationMessage rmOutbox = null; - ColumnFamily cfInbox = null; - ColumnFamily cfOutbox = null; - String firstuser = null ; - String nextuser = null; - while ((line = bufReader.readLine()) != null) { - StringTokenizer st = new StringTokenizer(line, delimiter_); - int i = 0; - String threadId = null; - int lastUpdated = 0; - int isDeleted = 0; - int folder = 0; - String user = null; - while (st.hasMoreElements()) { - switch (i) { - case 0: - user = (String) st.nextElement();// sb.append((String)st.nextElement()); - if ( !isNumeric(user)) - continue; - break; - - case 1: - folder = Integer.parseInt((String) st.nextElement());// sb.append((String)st.nextElement()); - break; - - case 2: - threadId = (String) st.nextElement(); - break; - - case 3: - lastUpdated = Integer.parseInt((String) st.nextElement()); - break; - - case 4: - isDeleted = Integer.parseInt((String) st.nextElement());// (String)st.nextElement(); - break; - - default: - break; - } - ++i; - } - String key = null; - if (folder == 0) { - key = user + ":0"; - } else { - key = user + ":1"; - } - - nextuser = key; - if(firstuser == null || firstuser.compareTo(nextuser) != 0) - { - ArrayList columns = null; - firstuser = key; - try { - Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_); - long t = System.currentTimeMillis(); - - peerstorageClient_.remove(tablename_,key,(columnFamilyHack_%divideby_)+":"+threadId); - numReqs_++; - totalTime_ = totalTime_ + (System.currentTimeMillis() - t); - logger_.debug("Numreqs:" + numReqs_ + " Average: " + totalTime_/numReqs_+ " Time taken for thrift..." - + (System.currentTimeMillis() - t)); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - - } - public void run(String[] args) throws Throwable { if (args[0].compareTo("-testWriteMailbox") == 0 || @@ -1491,11 +1412,6 @@ public void run(String[] args) throws Throwable testSuperReadThrift(args[2] + System.getProperty("file.separator") + fileName); } - if ( args[0].compareTo("-testRemove") == 0 ) - { - testRemove(args[2] - + System.getProperty("file.separator") + fileName); - } if ( args[0].compareTo("-testPhp") == 0 ) { testPhp(args[2]