Skip to content

Pushing data to Voldemort R W Store using Hadoop

rsumbaly edited this page Aug 17, 2010 · 2 revisions

Voldemort has always supported the nice feature of building a read-only store in Hadoop and then atomically swapping it into place to serve read requests. What has always been missing is the capability to do something similar for read-write stores. Many users of Voldemort have use-cases which would require building read-write instead of read-only stores. Currently such users tend to do naive multiple puts which tend to be a time-consuming process. The ideal solution to this problem would be a system capable of streaming entries to their respective partitions. With the inclusion of AdminClient API updateEntries, we are now able to do this. And since many users tend to have their data in HDFS the use of Hadoop as a parallel streaming system is the ideal choice.

Voldemort bulk-load, a new feature in Voldemort whereby we can now run multiple map reduce jobs on our HDFS data and then finally stream the output to an existing Voldemort server, all ready to start serving. We have tried as much as possible to keep the interface similar to read-only store. Hence most of the steps may be similar to those highlighted in our previous blog here.

Lets build one…

We will build a read-write Voldemort store capable of storing word counts. For the sake of simplicity this will be a single node cluster, but this can easily be extended to multiple nodes.

Build Voldemort

Lets start by downloading and building Voldemort.

$ git clone git@github.com:voldemort/voldemort.git voldemort
$ cd voldemort
$ ant

Start Voldemort Server

We can use the default cluster.xml and server.properties configuration files stored in config/single_node_cluster/config to start a single node cluster. We will modify the stores.xml to create a store called wordcount with a json string as its key and an integer (i.e. count) as its value. So the stores.xml would look as follows :

<stores>
  <store>
     <name>wordcount</name>
     <persistence>bdb</persistence>
     <routing>client</routing>
     <replication-factor>1</replication-factor>
     <required-reads>1</required-reads>
     <required-writes>1</required-writes>
     <key-serializer>
          <type>json</type>
          <schema-info>"string"</schema-info>
     </key-serializer>
     <value-serializer>
          <type>json</type>
          <schema-info>"int32"</schema-info>
     </value-serializer>
  </store>
</stores>

Then we start the server and connect to it to see if everything is working:

$ ./bin/voldemort-server.sh config/single_node_cluster & > /tmp/voldemort.log &
$ ./bin/voldemort-shell.sh wordcount tcp://localhost:6666
Established connection to wordcount via tcp://localhost:6666
> get "and"
null

Prepare & load data

This step is exactly similar to Step 3 highlighted in the read-only store blog. At the end of this step we assume the user has Hadoop working along with a folder containing files of tab-separated records (\t).

$ bin/hadoop fs -copyFromLocal <local-dir> <hdfs-dir>
$ bin/hadoop jar hadoop-*-examples.jar wordcount [-m <#maps>] [-r <#reducers>] 
                 <in-dir> <out-dir>

Implement our interface

The logic for generating partitioned data and streaming is abstracted away from the user. Users need to implement a simple interface (which wraps the mapper) which describes how to extract the key and value objects from the files provided. In our example dataset we only need to split the actual line on the TAB character and return a String object for the key and an Integer object for the value as defined in stores.xml. We then need to export this class into a jar (say mapper.jar)

package voldemort.readwrite;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import voldemort.store.readwrite.mr.AbstractRWHadoopStoreBuilderMapper;

public class WordCountMapper extends AbstractRWHadoopStoreBuilderMapper
                                     <LongWritable, Text> {

    @Override
    public Object makeKey(LongWritable key, Text value) {
        return value.toString().split("\t")[0];
    }

    @Override
    public Object makeValue(LongWritable key, Text value) {
        return Integer.parseInt(value.toString().split("\t")[1]);
    }
}

Stream it

The next step is to run the shell script which generates the store data and begins the streaming over to Voldemort. An important point to remember here is that the Hadoop nodes in your cluster should have their admin client port open in order to communicate with Voldemort. Most of the command line options are self-explanatory. For our example, the command line options would be as follows:

$ bin/hadoop-build-readwrite-store.sh 
                   --reducerspernode 10
                   --cluster config/single_node_cluster/config/cluster.xml 
                   --input <input-dir> 
                   --mapper voldemort.readwrite.WordCountMapper 
                   --jar mapper.jar
                   --storedefinitions config/single_node_cluster/config/stores.xml 
                   --temp <temp-dir>
                   --storename wordcount
                   --vectornodeid 100
                   --vectorversion 1

In Voldemort we version every K/V pair using Vector Clocks. This helps us in doing conflict resolution if the replicas of a K/V end up with different version. Hence it is essential to version all key/value pairs being inserted correctly. This is the motivation to have the last two options shown above viz. —vectornodeid and —vectorversion. The —vectornodeid is an integer id you would use the represent the entire hadoop machine in the vector. By default this would be set to the “master” node for the key i.e. the node onto which the key hashed. The —vectorversion is the version you are pushing to Voldemort. In the above example we are assigning 100 as the hadoop id and have set the push version to 1. This will result in “concurrent vector clocks” which can resolved later as shown in the next step.

Check if it works

And ofcourse the last step is to check if everything is working. Lets connect to the server through code and see if everything works. An important scenario to remember here is that during the push to Voldemort, we may have put two “concurrent” versions of the same key. Hence based on the application requirements it may become essential to write your own custom resolver (the default resolver does a timestamp based resolution.)

The following is an example of a resolver which takes multiple concurrent versions, merges their vector clock while gives us the value with the max timestamp

 public static class MyResolver implements InconsistencyResolver<Versioned<T>> {

        public List<Versioned<T>> resolveConflicts(List<Versioned<T>> items) {
            if(items.size() <= 1) {
                return items;
            } else {
                Iterator<Versioned<T>> iter = items.iterator();

                Versioned<T> current = iter.next();
                T returnValue = current.getValue();
                VectorClock clock = (VectorClock) current.getVersion();
                long maxTime = ((VectorClock) current.getVersion()).getTimestamp();
                while(iter.hasNext()) {
                    Versioned<T> versioned = iter.next();
                    VectorClock newClock = (VectorClock) versioned.getVersion();
                    if(newClock.getTimestamp() > maxTime) {
                        returnValue = versioned.getValue();
                        maxTime = clock.getTimestamp();
                    }
                    clock = clock.merge((VectorClock) versioned.getVersion());
                }
                return Collections.singletonList(new Versioned<T>(returnValue, clock));
            }
        }

    }

And we can then pass this resolver while building our store client. We should get our inserted word-count in “result” below.

SocketStoreClientFactory socketFactory = new SocketStoreClientFactory
                     (new ClientConfig().setBootstrapUrls("tcp://localhost:6666");
StoreClient<Object, Object> storeClient = socketFactory.getStoreClient("wordcount", 
                     new MyResolver());
Versioned<Object> result = storeClient.get(new String("and")); 

Restrictions

- We assume all nodes are up and running (the same assumption as for read-only store building)

Clone this wiki locally