Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

No-DFS and compression patches #3

Merged
merged 2 commits into from

2 participants

@Ssmithcr
  1. Remove reliance on DFS, but still allow the use of a DFS if you have one.
  2. Fast-enough compression of intermediate data files.
Ssmithcr added some commits
@Ssmithcr Ssmithcr Remove reliance on a distributed file system.
If DPARK_WORK_DIR is set instead of DPARK_SHARE_DIR, a web server will
be started on each slave to serve files to the other slaves.
2c190a4
@Ssmithcr Ssmithcr Use compression for the intermediate shuffle files.
I tried Google's Snappy compression, but it still left my nodes I/O
bound; I found zlib level 1 to be the best compromise between CPU and
I/O.  Compression in this case takes around the same amount of time
that marshalling takes; however decompression is much less than the
time to unmarshal.
fc7c116
@davies

open, url = file, xxxx for readability

@davies

this patch should been updated to work with compression

the compression patch changes this code; I was trying to keep them independent (nodfs patch applies first, then compression patch).

@davies

good, but need more work to handling fetching failure.

Without reimplementing a DFS, there isn't a good way to deal with a slave failure -- recreating intermediate files from the shuffle jobs would basically require starting over.

If you're talking about a transient failure, though, that should be handled properly. If fetching fails, it will raise an exception. This will cause the task to fail. The scheduler will then reschedule the task.

@davies davies merged commit aa3edd5 into douban:master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on May 6, 2012
  1. @Ssmithcr

    Remove reliance on a distributed file system.

    Ssmithcr authored
    If DPARK_WORK_DIR is set instead of DPARK_SHARE_DIR, a web server will
    be started on each slave to serve files to the other slaves.
  2. @Ssmithcr

    Use compression for the intermediate shuffle files.

    Ssmithcr authored
    I tried Google's Snappy compression, but it still left my nodes I/O
    bound; I found zlib level 1 to be the best compromise between CPU and
    I/O.  Compression in this case takes around the same amount of time
    that marshalling takes; however decompression is much less than the
    time to unmarshal.
This page is out of date. Refresh to see the latest.
Showing with 70 additions and 21 deletions.
  1. +14 −6 dpark/env.py
  2. +36 −4 dpark/executor.py
  3. +18 −10 dpark/shuffle.py
  4. +2 −1  dpark/task.py
View
20 dpark/env.py
@@ -17,16 +17,22 @@ def get(cls, name, default=None):
def __init__(self):
self.started = False
- def start(self, isMaster, environ={}, isLocal=False):
+ def start(self, isMaster, environ={}, isLocal=False, port=None):
if getattr(self, 'started', False):
return
logger.debug("start env in %s: %s %s", os.getpid(),
isMaster, environ)
if isMaster:
- root = '/tmp/dpark'
- if not isLocal:
- root = os.environ.get("DPARK_SHARE_DIR")
- if not root:
+ if isLocal:
+ root = '/tmp/dpark'
+ self.dfs = True
+ elif os.environ.has_key('DPARK_SHARE_DIR'):
+ root = os.environ['DPARK_SHARE_DIR']
+ self.dfs = True
+ elif os.environ.has_key('DPARK_WORK_DIR'):
+ root = os.environ['DPARK_WORK_DIR']
+ self.dfs = False
+ else:
raise Exception("no shuffle dir exists")
if not os.path.exists(root):
os.mkdir(root, 0777)
@@ -36,14 +42,16 @@ def start(self, isMaster, environ={}, isLocal=False):
self.workdir = os.path.join(root, name)
os.makedirs(self.workdir)
self.environ['WORKDIR'] = self.workdir
+ self.environ['DPARK_HAS_DFS'] = str(self.dfs)
else:
self.environ.update(environ)
+ self.dfs = (self.environ['DPARK_HAS_DFS'] == 'True')
from cache import CacheTracker
self.cacheTracker = CacheTracker(isMaster)
from shuffle import LocalFileShuffle, MapOutputTracker, SimpleShuffleFetcher
- LocalFileShuffle.initialize(isMaster)
+ LocalFileShuffle.initialize(isMaster, port)
self.mapOutputTracker = MapOutputTracker(isMaster)
self.shuffleFetcher = SimpleShuffleFetcher()
View
40 dpark/executor.py
@@ -7,6 +7,10 @@
import cPickle
import socket
import multiprocessing
+import threading
+import SocketServer
+import SimpleHTTPServer
+import shutil
import zmq
import mesos
@@ -48,7 +52,7 @@ def run_task(task, aid):
except ValueError:
flag, data = 1, cPickle.dumps(result)
- if len(data) > TASK_RESULT_LIMIT:
+ if len(data) > TASK_RESULT_LIMIT and env.dfs:
workdir = env.get('WORKDIR')
path = os.path.join(workdir, str(task.id)+'.result')
with open(path, 'w') as f:
@@ -64,9 +68,27 @@ def run_task(task, aid):
setproctitle('dpark worker: idle')
return mesos_pb2.TASK_FAILED, cPickle.dumps((task.id, OtherFailure(msg), None, None), -1)
-def init_env(args):
+def init_env(args, port):
setproctitle('dpark worker: idle')
- env.start(False, args)
+ env.start(False, args, port=port)
+
+basedir = None
+class LocalizedHTTP(SimpleHTTPServer.SimpleHTTPRequestHandler):
+ def translate_path(self, path):
+ out = SimpleHTTPServer.SimpleHTTPRequestHandler.translate_path(self, path)
+ return basedir + '/' + out[len(os.getcwd()):]
+
+ def log_message(self, format, *args):
+ pass
+
+def startWebServer(path):
+ global basedir
+ basedir = path
+ ss = SocketServer.TCPServer(('0.0.0.0', 0), LocalizedHTTP)
+ threading.Thread(target=ss.serve_forever).start()
+ return ss.server_address[1]
+
+
def forword(fd, addr, prefix=''):
f = os.fdopen(fd, 'r')
@@ -112,7 +134,13 @@ def init(self, driver, args):
if err_logger:
self.errt, sys.stderr = start_forword(err_logger)
logging.basicConfig(format='%(asctime)-15s [%(name)-9s] %(message)s', level=logLevel)
- self.pool = multiprocessing.Pool(parallel, init_env, [args])
+ if args['DPARK_HAS_DFS'] == 'True':
+ self.workdir = None
+ port = None
+ else:
+ self.workdir = args['WORKDIR']
+ port = startWebServer(args['WORKDIR'])
+ self.pool = multiprocessing.Pool(parallel, init_env, [args, port])
logger.debug("executor started at %s", socket.gethostname())
except Exception, e:
import traceback
@@ -141,6 +169,10 @@ def killTask(self, driver, taskId):
pass
def shutdown(self, driver):
+ # clean work files
+ if self.workdir:
+ try: shutil.rmtree(self.workdir, True)
+ except: pass
# flush
sys.stdout.close()
sys.stderr.close()
View
28 dpark/shuffle.py
@@ -7,6 +7,7 @@
import socket
import time
import cPickle
+import zlib as comp
import zmq
@@ -19,14 +20,17 @@ class LocalFileShuffle:
serverUri = None
shuffleDir = None
@classmethod
- def initialize(cls, isMaster):
+ def initialize(cls, isMaster, port):
shuffleDir = env.get('WORKDIR')
if not shuffleDir:
return
- while not os.path.exists(shuffleDir):
- time.sleep(0.1) # HACK for moosefs
cls.shuffleDir = shuffleDir
- cls.serverUri = shuffleDir
+ if port is None:
+ while not os.path.exists(shuffleDir):
+ time.sleep(0.1) # HACK for moosefs
+ cls.serverUri = 'file://' + cls.shuffleDir
+ else:
+ cls.serverUri = 'http://%s:%d' % (socket.gethostname(), port)
logger.debug("shuffle dir: %s", shuffleDir)
@classmethod
@@ -56,21 +60,25 @@ def fetch(self, shuffleId, reduceId, func):
splitsByUri.setdefault(uri, []).append(i)
for uri, parts in splitsByUri.items():
for part in parts:
- url = "%s/%d/%d/%d" % (uri, shuffleId, part, reduceId)
- logger.debug("fetch %s", url)
+ if uri == LocalFileShuffle.getServerUri():
+ url = (file, LocalFileShuffle.getOutputFile(shuffleId, part, reduceId))
+ else:
+ url = (urllib.urlopen, "%s/%d/%d/%d" % (uri, shuffleId, part, reduceId))
+ logger.debug("fetch %s", url[1])
tries = 3
while True:
try:
- f = open(url, 'rb')
+ f = url[0](url[1])
flag = f.read(1)
+ d = comp.decompress(f.read())
+ f.close()
if flag == 'm':
- d = marshal.load(f)
+ d = marshal.loads(d)
elif flag == 'p':
- d = cPickle.load(f)
+ d = cPickle.loads(d)
else:
raise ValueError("invalid flag")
- f.close()
break
except IOError, e:
if not os.path.exists(uri): raise
View
3  dpark/task.py
@@ -4,6 +4,7 @@
import cPickle
import logging
import struct
+import zlib as comp
from serialize import load_func, dump_func
from shuffle import LocalFileShuffle
@@ -108,7 +109,7 @@ def run(self, attempId):
flag, d = 'p', cPickle.dumps(buckets[i], -1)
f = open(tpath, 'wb', 1024*4096)
f.write(flag)
- f.write(d)
+ f.write(comp.compress(d, 1))
f.close()
if not os.path.exists(path):
os.rename(tpath, path)
Something went wrong with that request. Please try again.