Skip to content
This repository has been archived by the owner on May 8, 2024. It is now read-only.

Commit

Permalink
first public commit
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Apr 11, 2012
0 parents commit 3c7c430
Show file tree
Hide file tree
Showing 48 changed files with 15,493 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -0,0 +1 @@
*.pyc
1 change: 1 addition & 0 deletions AUTHORS
@@ -0,0 +1 @@
Davies Liu <davies.liu AT gmail.com>
30 changes: 30 additions & 0 deletions LICENSE
@@ -0,0 +1,30 @@
Copyright (c) 2011, Douban Inc. <http://www.douban.com/>
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:

* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.

* Neither the name of the Douban Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22 changes: 22 additions & 0 deletions README
@@ -0,0 +1,22 @@
Dpark is a Python clone of Spark, MapReduce computing
framework supporting regression computation.

Word count example wc.py:

from dpark import DparkContext
ctx = DparkContext()
file = ctx.textFile("/tmp/words.txt")
words = file.flatMap(lambda x:x.split()).map(lambda x:(x,1))
wc = words.reduceByKey(lambda x,y:x+y).collectAsMap()
print wc

This scripts can run locally or on Mesos cluster without
any modification, just with different command arguments:

$ python wc.py
$ python wc.py -m process
$ python wc.py -m mesos

See examples/ for more examples.

Some Chinese docs: https://github.com/jackfengji/test_pro/wiki
3 changes: 3 additions & 0 deletions TODO
@@ -0,0 +1,3 @@
* run DPark under pypy
* HDFS support
* MFS client support
2 changes: 2 additions & 0 deletions dpark/__init__.py
@@ -0,0 +1,2 @@
from context import DparkContext, parser as optParser
from bagel import Bagel
85 changes: 85 additions & 0 deletions dpark/accumulator.py
@@ -0,0 +1,85 @@
from operator import add
import copy
from serialize import load_func, dump_func

class AccumulatorParam:
def __init__(self, zero, addInPlace):
self.zero = zero
self.addInPlace = addInPlace

def __getstate__(self):
return dump_func(self.addInPlace), self.zero

def __setstate__(self, state):
add, self.zero = state
self.addInPlace = load_func(add)

numAcc = AccumulatorParam(0, add)
listAcc = AccumulatorParam([], lambda x,y:x.extend(y) or x)
mapAcc = AccumulatorParam({}, lambda x,y:x.update(y) or x)
setAcc = AccumulatorParam(set(), lambda x,y:x.update(y) or x)


class Accumulator:
def __init__(self, initialValue=0, param=numAcc):
self.id = self.newId()
if param is None:
param = numAcc
self.param = param
self.value = initialValue
self.register(self, True)

def add(self, v):
self.value = self.param.addInPlace(self.value, v)
self.register(self, False)

def reset(self):
v = self.value
self.value = copy.copy(self.param.zero)
return v

def __getstate__(self):
return self.id, self.param

def __setstate__(self, s):
self.id, self.param = s
self.value = copy.copy(self.param.zero)
self.register(self, False)

nextId = 0
@classmethod
def newId(cls):
cls.nextId += 1
return cls.nextId

originals = {}
localAccums = {}
@classmethod
def register(cls, acc, original):
if original:
cls.originals[acc.id] = acc
else:
cls.localAccums[acc.id] = acc

@classmethod
def clear(cls):
for acc in cls.localAccums.values():
acc.reset()
cls.localAccums.clear()

@classmethod
def values(cls):
v = dict((id, accum.value) for id,accum in cls.localAccums.items())
cls.clear()
return v

@classmethod
def merge(cls, values):
for id, value in values.items():
cls.originals[id].add(value)

ReadBytes = Accumulator()
WriteBytes = Accumulator()
LocalReadBytes = Accumulator()
CacheHits = Accumulator()
CacheMisses = Accumulator()
109 changes: 109 additions & 0 deletions dpark/bagel.py
@@ -0,0 +1,109 @@
import time
import logging
from pprint import pprint

logger = logging.getLogger("bagel")

class Vertex:
def __init__(self, id, value, outEdges, active):
self.id = id
self.value = value
self.outEdges = outEdges
self.active = active

class Message:
def __init__(self, target_id, value):
self.target_id = target_id
self.value = value

class Edge:
def __init__(self, target_id, value=0):
self.target_id = target_id
self.value = value

class Combiner:
def createCombiner(self, msg): raise NotImplementedError
def mergeValue(self, combiner, msg): raise NotImplementedError
def mergeCombiners(self, a, b): raise NotImplementedError

class Aggregator:
def createAggregator(self, vert): raise NotImplementedError
def mergeAggregator(self, a, b): raise NotImplementedError

class DefaultListCombiner(Combiner):
def createCombiner(self, msg):
return [msg]
def mergeValue(self, combiner, msg):
return combiner+[msg]
def mergeCombiners(self, a, b):
return a + b

class DefaultValueCombiner(Combiner):
def createCombiner(self, msg):
return msg.value
def mergeValue(self, combiner, msg):
return combiner + msg.value
def mergeCombiners(self, a, b):
return a + b


class NullAggregator(Aggregator):
def createAggregator(self, vert):
pass
def mergeAggregator(self, a, b):
pass

class Bagel:
@classmethod
def run(cls, ctx, verts, msgs, compute,
combiner=DefaultValueCombiner(), aggregator=NullAggregator(),
superstep=0, numSplits=None):

while True:
logger.info("Starting superstep %d", superstep)
start = time.time()
aggregated = cls.agg(verts, aggregator)
combinedMsgs = msgs.combineByKey(combiner, numSplits)
grouped = verts.groupWith(combinedMsgs)
processed, numMsgs, numActiveVerts = cls.comp(ctx, grouped,
lambda v, ms: compute(v, ms, aggregated, superstep))
logger.info("superstep %d took %s s", superstep, time.time()-start)

noActivity = numMsgs == 0 and numActiveVerts == 0
if noActivity:
return processed.map(lambda (id, (vert, msgs)): vert)

verts = processed.mapValue(lambda (vert, msgs): vert)
msgs = processed.flatMap(lambda (id, (vert, msgs)):
[(m.target_id, m) for m in msgs])
superstep += 1

@classmethod
def agg(cls, verts, aggregator):
if isinstance(aggregator, NullAggregator):
return
r = verts.map(lambda (id, vert): aggregator.createAggregator(vert))
return r.reduce(aggregator.mergeAggregators)

@classmethod
def comp(cls, ctx, grouped, compute):
numMsgs = ctx.accumulator(0)
numActiveVerts = ctx.accumulator(0)
def proc((vs, cs)):
if not vs:
return []
newVert, newMsgs = compute(vs[0], cs)
numMsgs.add(len(newMsgs))
if newVert.active:
numActiveVerts.add(1)
return [(newVert, newMsgs)]
processed = grouped.flatMapValue(proc).cache()
# force evaluation of processed RDD for accurate performance measurements
processed.count()
return processed, numMsgs.value, numActiveVerts.value

@classmethod
def addAggregatorArg(cls, compute):
def _(vert, messages, aggregator, superstep):
return compute(vert, messages)
return _

0 comments on commit 3c7c430

Please sign in to comment.