Permalink
Browse files

Decide on a gcBefore before sending out TreeRequests

patch by marcuse, reviewed by yukim, for CASSANDRA-4932
  • Loading branch information...
1 parent 3e0b21c commit efbdee23705fed6be49a2fb20a79f1aaaa8c98ff Marcus Eriksson committed Apr 9, 2013
View
@@ -28,7 +28,8 @@
* remove row-level bloom filters (CASSANDRA-4885)
* Change Kernel Page Cache skipping into row preheating (disabled by default)
(CASSANDRA-4937)
-
+ * Improve repair by deciding on a gcBefore before sending
+ out TreeRequests (CASSANDRA-4932)
1.2.4
* Ensure that PerRowSecondaryIndex updates see the most recent values
@@ -618,7 +618,10 @@ private void doValidationCompaction(ColumnFamilyStore cfs, ActiveRepairService.V
// we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
// instead so they won't be cleaned up if they do get compacted during the validation
sstables = cfs.markCurrentSSTablesReferenced();
- gcBefore = getDefaultGcBefore(cfs);
+ if (validator.request.gcBefore > 0)
+ gcBefore = validator.request.gcBefore;
+ else
+ gcBefore = getDefaultGcBefore(cfs);
}
CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.request.range, gcBefore);
@@ -442,7 +442,7 @@ public long serializedSize(Validator validator, int version)
public void doVerb(MessageIn<TreeRequest> message, int id)
{
TreeRequest remotereq = message.payload;
- TreeRequest request = new TreeRequest(remotereq.sessionid, message.from, remotereq.range, remotereq.cf);
+ TreeRequest request = new TreeRequest(remotereq.sessionid, message.from, remotereq.range, remotereq.gcBefore, remotereq.cf);
// trigger read-only compaction
ColumnFamilyStore store = Table.open(request.cf.left).getColumnFamilyStore(request.cf.right);
@@ -462,7 +462,7 @@ public void doVerb(MessageIn<Validator> message, int id)
{
// deserialize the remote tree, and register it
Validator response = message.payload;
- TreeRequest request = new TreeRequest(response.request.sessionid, message.from, response.request.range, response.request.cf);
+ TreeRequest request = new TreeRequest(response.request.sessionid, message.from, response.request.range, response.request.gcBefore, response.request.cf);
ActiveRepairService.instance.rendezvous(request, response.tree);
}
}
@@ -489,20 +489,22 @@ public CFPair(String table, String cf)
public final String sessionid;
public final InetAddress endpoint;
public final Range<Token> range;
+ public final int gcBefore;
public final CFPair cf;
- public TreeRequest(String sessionid, InetAddress endpoint, Range<Token> range, CFPair cf)
+ public TreeRequest(String sessionid, InetAddress endpoint, Range<Token> range, int gcBefore, CFPair cf)
{
this.sessionid = sessionid;
this.endpoint = endpoint;
this.cf = cf;
+ this.gcBefore = gcBefore;
this.range = range;
}
@Override
public final int hashCode()
{
- return Objects.hashCode(sessionid, endpoint, cf, range);
+ return Objects.hashCode(sessionid, endpoint, gcBefore, cf, range);
}
@Override
@@ -512,13 +514,13 @@ public final boolean equals(Object o)
return false;
TreeRequest that = (TreeRequest)o;
// handles nulls properly
- return Objects.equal(sessionid, that.sessionid) && Objects.equal(endpoint, that.endpoint) && Objects.equal(cf, that.cf) && Objects.equal(range, that.range);
+ return Objects.equal(sessionid, that.sessionid) && Objects.equal(endpoint, that.endpoint) && gcBefore == that.gcBefore && Objects.equal(cf, that.cf) && Objects.equal(range, that.range);
}
@Override
public String toString()
{
- return "#<TreeRequest " + sessionid + ", " + endpoint + ", " + cf + ", " + range + ">";
+ return "#<TreeRequest " + sessionid + ", " + endpoint + ", " + gcBefore + ", " + cf + ", " + range + ">";
}
public MessageOut<TreeRequest> createMessage()
@@ -532,6 +534,9 @@ public void serialize(TreeRequest request, DataOutput out, int version) throws I
{
out.writeUTF(request.sessionid);
CompactEndpointSerializationHelper.serialize(request.endpoint, out);
+
+ if (version >= MessagingService.VERSION_20)
+ out.writeInt(request.gcBefore);
out.writeUTF(request.cf.left);
out.writeUTF(request.cf.right);
AbstractBounds.serializer.serialize(request.range, out, version);
@@ -541,17 +546,21 @@ public TreeRequest deserialize(DataInput in, int version) throws IOException
{
String sessId = in.readUTF();
InetAddress endpoint = CompactEndpointSerializationHelper.deserialize(in);
+ int gcBefore = -1;
+ if (version >= MessagingService.VERSION_20)
+ gcBefore = in.readInt();
CFPair cfpair = new CFPair(in.readUTF(), in.readUTF());
Range<Token> range;
range = (Range<Token>) AbstractBounds.serializer.deserialize(in, version);
- return new TreeRequest(sessId, endpoint, range, cfpair);
+ return new TreeRequest(sessId, endpoint, range, gcBefore, cfpair);
}
public long serializedSize(TreeRequest request, int version)
{
return TypeSizes.NATIVE.sizeof(request.sessionid)
+ CompactEndpointSerializationHelper.serializedSize(request.endpoint)
+ + TypeSizes.NATIVE.sizeof(request.gcBefore)
+ TypeSizes.NATIVE.sizeof(request.cf.left)
+ TypeSizes.NATIVE.sizeof(request.cf.right)
+ AbstractBounds.serializer.serializedSize(request.range, version);
@@ -833,8 +842,10 @@ public void sendTreeRequests()
if (isSequential)
makeSnapshots(endpoints);
+ int gcBefore = (int)(System.currentTimeMillis()/1000) - Table.open(tablename).getColumnFamilyStore(cfname).metadata.getGcGraceSeconds();
+
for (InetAddress endpoint : allEndpoints)
- treeRequests.add(new TreeRequest(getName(), endpoint, range, new CFPair(tablename, cfname)));
+ treeRequests.add(new TreeRequest(getName(), endpoint, range, gcBefore, new CFPair(tablename, cfname)));
logger.info(String.format("[repair #%s] requesting merkle trees for %s (to %s)", getName(), cfname, allEndpoints));
treeRequests.start();
Binary file not shown.
Binary file not shown.
@@ -91,7 +91,8 @@ public void testValidationMultipleSSTablePerLevel() throws Exception
ActiveRepairService.CFPair p = new ActiveRepairService.CFPair(ksname, cfname);
Range<Token> range = new Range<Token>(Util.token(""), Util.token(""));
- ActiveRepairService.TreeRequest req = new ActiveRepairService.TreeRequest("1", FBUtilities.getLocalAddress(), range, p);
+ int gcBefore = (int)(System.currentTimeMillis()/1000) - table.getColumnFamilyStore(cfname).metadata.getGcGraceSeconds();
+ ActiveRepairService.TreeRequest req = new ActiveRepairService.TreeRequest("1", FBUtilities.getLocalAddress(), range, gcBefore, p);
ActiveRepairService.Validator validator = new ActiveRepairService.Validator(req);
CompactionManager.instance.submitValidation(store, validator).get();
}
@@ -105,7 +105,8 @@ public void prepare() throws Exception
local_range = StorageService.instance.getLocalPrimaryRange();
// (we use REMOTE instead of LOCAL so that the reponses for the validator.complete() get lost)
- request = new TreeRequest(UUID.randomUUID().toString(), REMOTE, local_range, new CFPair(tablename, cfname));
+ int gcBefore = (int)(System.currentTimeMillis()/1000) - store.metadata.getGcGraceSeconds();
+ request = new TreeRequest(UUID.randomUUID().toString(), REMOTE, local_range, gcBefore, new CFPair(tablename, cfname));
// Set a fake session corresponding to this fake request
ActiveRepairService.instance.submitArtificialRepairSession(request, tablename, cfname);
}
@@ -106,6 +106,6 @@ public void testTreeResponseRead() throws IOException
private static class Statics
{
private static final ActiveRepairService.CFPair pair = new ActiveRepairService.CFPair("Keyspace1", "Standard1");
- private static final ActiveRepairService.TreeRequest req = new ActiveRepairService.TreeRequest("sessionId", FBUtilities.getBroadcastAddress(), FULL_RANGE, pair);
+ private static final ActiveRepairService.TreeRequest req = new ActiveRepairService.TreeRequest("sessionId", FBUtilities.getBroadcastAddress(), FULL_RANGE, 1234, pair);
}
}

0 comments on commit efbdee2

Please sign in to comment.