Permalink
Browse files

More consistent. XVoldemortStore naming. Improved documentation of sp…

…lit issues.
  • Loading branch information...
1 parent a706538 commit 81d92232f60728ba6472ba980c21fa3ebd6c278f @daggerrz committed Apr 12, 2011
@@ -28,6 +28,7 @@
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClientConfig;
@@ -39,6 +40,8 @@
public class VoldemortInputFormat extends InputFormat<ByteArray, Versioned<byte[]>> {
+ private final Logger logger = Logger.getLogger(VoldemortInputFormat.class);
+
/**
* Create a new connection to admin client and give it to RecordReader.
* Called on the TaskTracker
@@ -77,15 +80,15 @@
throw new VoldemortException("Store '" + storeName + "' not found");
}
- // Generate splits
+ // Generate one split per node.
+ // Should consider a config setting allowing one split per partition.
Iterator<Node> nodeIter = cluster.getNodes().iterator();
List<InputSplit> splits = new ArrayList<InputSplit>();
while(nodeIter.hasNext()) {
Node currentNode = nodeIter.next();
VoldemortInputSplit split = new VoldemortInputSplit(storeName, currentNode);
splits.add(split);
}
-
adminClient.stop();
return splits;
}
@@ -40,13 +40,14 @@ public VoldemortInputSplit(String storeName, Node node) {
}
/**
- * Is used to order the splits so that the largest get processed first, in
- * an attempt to minimize the job runtime...Voldemort doesn't care!
+ * Pig will order the splits so that the largest get processed first. This has no
+ * consequence for Voldemort, but newer version of pig will also try to combine
+ * splits if the size of them are less than config <i>pig.maxCombinedSplitSize</i>.
+ * This does not map well to the Voldemort Storage. To avoid splitting altogether,
+ * we return Long.MAX_VALUE.
*/
@Override
public long getLength() throws IOException, InterruptedException {
- // Quick test to avoid merging splits. The correct way to do this is
- // setup getLocations correctly!
return Long.MAX_VALUE;
}
@@ -25,6 +25,7 @@
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.utils.ByteArray;
@@ -33,6 +34,8 @@
public class VoldemortRecordReader extends RecordReader<ByteArray, Versioned<byte[]>> {
+ private final Logger logger = Logger.getLogger(VoldemortRecordReader.class);
+
private AdminClient adminClient;
private Iterator<Pair<ByteArray, Versioned<byte[]>>> iter = null;
private Pair<ByteArray, Versioned<byte[]>> currentPair = null;
@@ -67,6 +70,7 @@ public void initialize(InputSplit split, TaskAttemptContext context) throws IOEx
partitionIds.addAll(adminClient.getAdminClientCluster()
.getNodeById(voldemortSplit.getNodeId())
.getPartitionIds());
+ logger.info("Initializing split for node " + voldemortSplit.getNodeId() + ", partitions " + partitionIds);
this.iter = adminClient.fetchEntries(voldemortSplit.getNodeId(),
voldemortSplit.getStoreName(),
partitionIds,
@@ -37,7 +37,7 @@
/**
* Voldemort store which exposes values as Strings.
*/
-public class VoldemortStore extends AbstractVoldemortStore {
+public class StringVoldemortStore extends AbstractVoldemortStore {
@Override
protected Tuple extractTuple(ByteArray key, Versioned<byte[]> value) throws ExecException {

0 comments on commit 81d9223

Please sign in to comment.