Skip to content
This repository

Integration with the Disco Framework for distributed computation

branch: master

Fetching latest commit…

Cannot retrieve the latest commit at this time

README.md

MongoDB Disco Adapter

Note: for a more regularly maintained fork, please see https://github.com/mongodb/mongo-disco

The MongoDB Disco Adapter is a plugin which connect MongoDB and Disco MapRedcue framework by enabling users the ability to use MongoDB as an data input and/or an output source.

Prerequisites

For each machine in a disco cluster, it need following:

-Python

-PyMongo

-Disco

For instructions to setup disco clusters, please refer to the the guide(http://discoproject.org/doc/disco/start/install.html) in disco project website.

Installation

  1. Check out the latest source code in github
   $ git clone https://github.com/10genNYUITP/MongoDisco MongoDisco
  1. Go to MongoDisco folder, and run the setup.py file to install MongoDisco package
    $ python setup.py install 
Note: it may request administrator privilege to run the script

It’s done! Start hacking!

Example

Word Counting is a classic example for MapReduce framework. It could be done extremely easily using the MongoDB Disco Adapter.

Step 1. Users need to specify the configuration for this job.

For example, users could specify where the input data is stored and how they would like to store output data by providing a mongodb uri.

config = {
        "input_uri": "mongodb://localhost/test.in",
        "output_uri": "mongodb://localhost/test.out",
        "create_input_splits": True,
        "split_key": {_id:1},
        "split_size”:1, #MB
}

You can find more detailed configuration in the appendix.

Here, we assume we assume that input data is in database “test”, collection “in”, and we want to split data on “_id” field by setting the split_size equal to 1 Megabyte. The result would be written back to collection “out” at last.

Step 2. Write up its own map function

Here we would like to read the value under the field “word” and count it, so the map function would like following:

def map(doc, params):
    yield record.get('doc', "NoWord"), 1

Note: doc is an ordinary document return by mongodb query. You can perform any operations on it as MongoDB allowed.

Setup 3. Write up reduce function

As we already get key-value generators from the map process, we only need perform sum operation for each word.

def reduce(iter, params):
    from disco.util import kvgroup
    for word, counts in kvgroup(sorted(iter)):
        yield word, sum(counts)

The first parameter, iter, is an iterator over keys and values produced by the map function. We use disco.util.kvgroup() to simply pull out each word along with its counts, and sum them together.

Setup 4. Create a DiscoJob instance and run it

from mongodisco.job import DiscoJob

DiscoJob(config = config,map = map,reduce = reduce).run()

Now you run it in a terminal like other python codes and check the result in MongoDB.

Appendix

Configuration for DiscoJob

Name Default Value Note
input_uri mongodb://localhost/test.in mongodb uri for input data
output_uri mongodb://localhost/test.out mongodb uri for output result
print_to_stdout False if True, print result to stdout
job_wait True if False, code won’t wait for end of job
create_input_splits True if True, data will be splitted
split_size 8 size for one split
split_key {“_id”:1} field for performing splitting
use_shards False if True, directly connect to shards to retrieve data
use_chunks True if True, directly use chunks splitted by mongoDB as splits
input_key None Unknown!!!
slave_ok False same as slave_okay
query {} same as spec parameter of find method
fields None same as fields parameter of find method
sort None same as sort parameter of find method
limit 0 same as limit parameter of find method
skip 0 same as skip parameter of find method
job_output_key “_id” field name for output key
job_output_value “value” field name for output value
Something went wrong with that request. Please try again.