Skip to content

Commit

Permalink
Added a mode to the forklift tool to allow all conflicting versions
Browse files Browse the repository at this point in the history
to be copied over to the destination
  • Loading branch information
abh1nay committed Apr 23, 2013
1 parent 736ccc8 commit 800d48f
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 8 deletions.
86 changes: 80 additions & 6 deletions src/java/voldemort/utils/ClusterForkLiftTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public class ClusterForkLiftTool implements Runnable {
private final int progressOps;
private final HashMap<String, StoreDefinition> srcStoreDefMap;
private final List<Integer> partitionList;
private final boolean globalResolution;
private final ForkLiftTaskMode mode;

public ClusterForkLiftTool(String srcBootstrapUrl,
String dstBootstrapUrl,
Expand All @@ -130,7 +130,7 @@ public ClusterForkLiftTool(String srcBootstrapUrl,
int progressOps,
List<String> storesList,
List<Integer> partitions,
boolean globalResolution) {
ForkLiftTaskMode mode) {
// set up AdminClient on source cluster
this.srcAdminClient = new AdminClient(srcBootstrapUrl,
new AdminClientConfig(),
Expand All @@ -142,6 +142,7 @@ public ClusterForkLiftTool(String srcBootstrapUrl,
props.put("streaming.platform.throttle.qps", maxPutsPerSecond);
StreamingClientConfig config = new StreamingClientConfig(props);
this.dstStreamingClient = new StreamingClient(config);
this.mode = mode;

// determine and verify final list of stores to be forklifted over
if(storesList != null) {
Expand Down Expand Up @@ -173,7 +174,7 @@ public ClusterForkLiftTool(String srcBootstrapUrl,
// set up thread pool to parallely forklift partitions
this.workerPool = Executors.newFixedThreadPool(partitionParallelism);
this.progressOps = progressOps;
this.globalResolution = globalResolution;

}

private HashMap<String, StoreDefinition> checkStoresOnBothSides() {
Expand All @@ -200,6 +201,12 @@ private HashMap<String, StoreDefinition> checkStoresOnBothSides() {
return srcStoreDefMap;
}

enum ForkLiftTaskMode {
global_resolution,
primary_resolution,
no_resolution
}

/**
* TODO this base class can potentially provide some framework of execution
* for the subclasses, to yield a better objected oriented design (progress
Expand Down Expand Up @@ -412,6 +419,58 @@ public void run() {
}
}

/**
* Simply fetches the data for the partition from the primary replica and
* writes it into the destination cluster without resolving any of he
* conflicting values
*
*/
class SinglePartitionNoResolutionForkLiftTask extends SinglePartitionForkLiftTask implements
Runnable {

SinglePartitionNoResolutionForkLiftTask(StoreInstance storeInstance,
int partitionId,
CountDownLatch latch) {
super(storeInstance, partitionId, latch);
}

@Override
public void run() {
String storeName = this.storeInstance.getStoreDefinition().getName();
long entriesForkLifted = 0;
try {
logger.info(workName + "Starting processing");
Iterator<Pair<ByteArray, Versioned<byte[]>>> entryItr = srcAdminClient.bulkFetchOps.fetchEntries(storeInstance.getNodeIdForPartitionId(this.partitionId),
storeName,
Lists.newArrayList(this.partitionId),
null,
true);

while(entryItr.hasNext()) {
Pair<ByteArray, Versioned<byte[]>> record = entryItr.next();
ByteArray key = record.getFirst();
Versioned<byte[]> versioned = record.getSecond();
dstStreamingClient.streamingPut(key, versioned);
entriesForkLifted++;
if(entriesForkLifted % progressOps == 0) {
logger.info(workName + " fork lifted " + entriesForkLifted
+ " entries successfully");
}

}
logger.info(workName + "Completed processing " + entriesForkLifted + " records");

} catch(Exception e) {
// if for some reason this partition fails, we will have retry
// again for those partitions alone.
logger.error(workName + "Error forklifting data ", e);
} finally {
latch.countDown();
}

}
}

@Override
public void run() {
final Cluster srcCluster = srcAdminClient.getAdminClientCluster();
Expand Down Expand Up @@ -441,18 +500,24 @@ public Object call() throws Exception {

// submit work on every partition that is to be forklifted
for(Integer partitionId: partitionList) {
if(this.globalResolution) {
if(this.mode == ForkLiftTaskMode.global_resolution) {
// do thorough global resolution across replicas
SinglePartitionGloballyResolvingForkLiftTask work = new SinglePartitionGloballyResolvingForkLiftTask(storeInstance,
partitionId,
latch);
workerPool.submit(work);
} else {
} else if(this.mode == ForkLiftTaskMode.primary_resolution) {
// do the less cleaner, but much faster route
SinglePartitionPrimaryResolvingForkLiftTask work = new SinglePartitionPrimaryResolvingForkLiftTask(storeInstance,
partitionId,
latch);
workerPool.submit(work);
} else if(this.mode == ForkLiftTaskMode.no_resolution) {
// do the less cleaner, but much faster route
SinglePartitionNoResolutionForkLiftTask work = new SinglePartitionNoResolutionForkLiftTask(storeInstance,
partitionId,
latch);
workerPool.submit(work);
}
}

Expand Down Expand Up @@ -524,6 +589,8 @@ private static OptionParser getParser() {
.ofType(Integer.class);
parser.accepts("global-resolution",
"Determines if a thorough global resolution needs to be done, by comparing all replicas. [Default: Fetch from primary alone ]");
parser.accepts("bypass-resolution",
"Does no resolution writes all the versioned values. [Default: Fetch from primary alone ]");
return parser;
}

Expand Down Expand Up @@ -577,14 +644,21 @@ public static void main(String[] args) throws Exception {
progressOps = (Integer) options.valueOf("progress-period-ops");
}

ForkLiftTaskMode mode;
mode = options.has("global-resolution") ? ForkLiftTaskMode.global_resolution
: ForkLiftTaskMode.primary_resolution;

mode = options.has("bypass-resolution") ? ForkLiftTaskMode.no_resolution
: ForkLiftTaskMode.primary_resolution;

ClusterForkLiftTool forkLiftTool = new ClusterForkLiftTool(srcBootstrapUrl,
dstBootstrapUrl,
maxPutsPerSecond,
partitionParallelism,
progressOps,
storesList,
partitions,
options.has("global-resolution"));
mode);
forkLiftTool.run();
// TODO cleanly shut down the hanging threadpool
System.exit(0);
Expand Down
4 changes: 2 additions & 2 deletions test/unit/voldemort/utils/ClusterForkLiftToolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void testPrimaryResolvingForkLift() throws Exception {
1000,
Lists.newArrayList(PRIMARY_RESOLVING_STORE_NAME),
null,
false);
ClusterForkLiftTool.ForkLiftTaskMode.primary_resolution);
forkLiftTool.run();

// do a write to destination cluster
Expand Down Expand Up @@ -258,7 +258,7 @@ public void testGloballyResolvingForkLift() throws Exception {
1000,
Lists.newArrayList(GLOBALLY_RESOLVING_STORE_NAME),
null,
true);
ClusterForkLiftTool.ForkLiftTaskMode.global_resolution);
forkLiftTool.run();

// do a write to destination cluster
Expand Down

0 comments on commit 800d48f

Please sign in to comment.