Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Updated MPI framework and included it in the distributions (contribut…

…ed by Harvey Feng, https://reviews.apache.org/r/4768).

git-svn-id: https://svn.apache.org/repos/asf/incubator/mesos/trunk@1359445 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
commit e7bf6f6ed7361d55a46dd2d5476841963f5d58da 1 parent a12a92c
@benh benh authored
View
4 Makefile.am
@@ -39,6 +39,10 @@ EXTRA_DIST += configure.amazon-linux-64 configure.centos-5.4-64 \
configure.macosx configure.ubuntu-lucid-64 configure.ubuntu-natty-64
+# MPI framework.
+EXTRA_DIST += mpi/README mpi/mpiexec-mesos.in mpi/mpiexec-mesos.py
+
+
if HAS_JAVA
maven-install:
@cd src && $(MAKE) $(AM_MAKEFLAGS) maven-install
View
2  configure.ac
@@ -100,6 +100,8 @@ AC_CONFIG_FILES([include/mesos/mesos.hpp])
AC_CONFIG_FILES([src/java/generated/org/apache/mesos/MesosNativeLibrary.java])
+AC_CONFIG_FILES([mpi/mpiexec-mesos], [chmod +x mpi/mpiexec-mesos])
+
AC_ARG_ENABLE([java],
AS_HELP_STRING([--disable-java],
View
37 frameworks/mpi/README.txt
@@ -1,37 +0,0 @@
-Mesos MPICH2 framework readme
---------------------------------------------
-
-Table of Contents:
-1) Installing MPICH2
-2) Running the Mesos MPICH2 framework
-
-=====================
-1) INSTALLING MPICH2:
-=====================
-This framework was developed using MPICH2 on Linux.
-
-You can install MPICH2 from scratch. You can get MPICH2 as well as installation directions here: http://www.mcs.anl.gov/research/projects/mpich2/
-
-I (Andy) installed MPICH2 using apt-get, but in Ubuntu, I had to add the Debian package mirror to my /etc/apt/sources.list file manuall.
-
-I.e. I added 'deb http://ftp.de.debian.org/debian sid main' to the end of the file.
-
-I also had to muck with keys since 9.04 (Jaunty) Ubuntu is using secure apt, so I did:
-
-gpg --recv-keys 4D270D06F42584E6
-gpg --export 4D270D06F42584E6 | apt-key add -
-
-though, theoretically, the following should suffice, it did not for me:
-
-apt-get install debian-keyring debian-archive-keyring
-apt-key update
-
-=====================================
-2) RUNNING THE MESOS MPICH2 FRAMEWORK
-=====================================
-
-1. Start a Mesos master and slaves see the MESOS_HOME/QUICKSTART.txt for help
-with this.
-2. In the MESOS_HOME/frameworks/mpi directory run the nmpiexec script. Pass the
--h flag to see help options.
- Example: ./nmpiexec -m 104857600 1@127.0.1.1:59608 hostname
View
15 frameworks/mpi/nmpiexec
@@ -1,15 +0,0 @@
-#!/bin/bash
-
-if [ "x$PYTHON" == "x" ]; then
- PYTHON=python
- if [ "`uname`" == "SunOS" ]; then
- PYTHON=python2.6
- fi
-fi
-
-if [ "x$MESOS_HOME" == "x" ]; then
- MESOS_HOME="$(dirname $0)/../.."
-fi
-
-export PYTHONPATH=$MESOS_HOME/lib/python:$MESOS_HOME/third_party/protobuf-2.3.0/python:$PYTHONPATH
-exec $PYTHON "$(dirname $0)/nmpiexec.py" $@
View
171 frameworks/mpi/nmpiexec.py
@@ -1,171 +0,0 @@
-#!/usr/bin/env python
-import mesos
-import mesos_pb2
-import os
-import sys
-import time
-import re
-import threading
-
-from optparse import OptionParser
-from subprocess import *
-
-TOTAL_TASKS = 1
-MPI_TASK = ""
-MPD_PID = ""
-CPUS = 1
-MEM = 1024
-
-def mpiexec(driver):
- print "We've launched all our MPDs; waiting for them to come up"
- while countMPDs() <= TOTAL_TASKS:
- print "...waiting on MPD(s)..."
- time.sleep(1)
- print "Got "+str(TOTAL_TASKS)+" mpd slots, running mpiexec"
- try:
- print "Running: "+"mpiexec -n "+str(TOTAL_TASKS)+" "+MPI_TASK
- os.system("mpiexec -1 -n "+str(TOTAL_TASKS)+" "+MPI_TASK)
- except OSError,e:
- print >>sys.stderr, "Error executing mpiexec"
- print >>sys.stderr, e
- exit(2)
- print "mpiexec completed, calling mpdexit "+MPD_PID
- call(["mpdexit",MPD_PID])
- time.sleep(1)
- driver.stop()
-
-class MyScheduler(mesos.Scheduler):
- def __init__(self, ip, port):
- self.ip = ip
- self.port = port
- self.tasksLaunched = 0
- self.tasksFinished = 0
-
- def getFrameworkName(self, driver):
- return "Mesos MPI Framework"
-
- def getExecutorInfo(self, driver):
- execPath = os.path.join(os.getcwd(), "startmpd.sh")
- initArg = ip + ":" + port
- execInfo = mesos_pb2.ExecutorInfo()
- execInfo.executor_id.value = "default"
- execInfo.uri = execPath
- execInfo.data = initArg
- return execInfo
-
- def registered(self, driver, fid):
- print "Mesos MPI scheduler and mpd running at "+self.ip+":"+self.port
-
- def resourceOffer(self, driver, oid, offers):
- print "Got offer %s" % oid.value
- tasks = []
- if self.tasksLaunched == TOTAL_TASKS:
- print "Rejecting permanently because we have already started"
- driver.replyToOffer(oid, tasks, {"timeout": "-1"})
- return
- for offer in offers:
- print "Considering slot on %s" % offer.hostname
- cpus = 0
- mem = 0
- for r in offer.resources:
- if r.name == "cpus":
- cpus = r.scalar.value
- elif r.name == "mem":
- mem = r.scalar.value
- if cpus < CPUS or mem < MEM:
- print "Rejecting slot due to too few resources"
- elif self.tasksLaunched < TOTAL_TASKS:
- tid = self.tasksLaunched
- print "Accepting slot to start mpd %d" % tid
- task = mesos_pb2.TaskDescription()
- task.task_id.value = str(tid)
- task.slave_id.value = offer.slave_id.value
- task.name = "task %d" % tid
- cpus = task.resources.add()
- cpus.name = "cpus"
- cpus.type = mesos_pb2.Resource.SCALAR
- cpus.scalar.value = CPUS
- mem = task.resources.add()
- mem.name = "mem"
- mem.type = mesos_pb2.Resource.SCALAR
- mem.scalar.value = MEM
- tasks.append(task)
- self.tasksLaunched += 1
- else:
- print "Rejecting slot because we've launched enough tasks"
- print "Replying to offer!"
- driver.replyToOffer(oid, tasks, {"timeout": "-1"})
- if self.tasksLaunched == TOTAL_TASKS:
- threading.Thread(target = mpiexec, args=[driver]).start()
-
- def statusUpdate(self, driver, update):
- print "Task %s in state %s" % (update.task_id.value, update.state)
- if (update.state == mesos_pb2.TASK_FINISHED or
- update.state == mesos_pb2.TASK_FAILED or
- update.state == mesos_pb2.TASK_KILLED or
- update.state == mesos_pb2.TASK_LOST):
- print "A task finished unexpectedly, calling mpdexit "+MPD_PID
- call(["mpdexit",MPD_PID])
- driver.stop()
-
-def countMPDs():
- try:
- mpdtraceout = Popen("mpdtrace -l", shell=True, stdout=PIPE).stdout
- count = 0
- for line in mpdtraceout:
- count += 1
-
- mpdtraceout.close()
- return count
- except OSError,e:
- print >>sys.stderr, "Error starting mpd or mpdtrace"
- print >>sys.stderr, e
- exit(2)
-
-def parseIpPort(s):
- ba = re.search("_([^ ]*) \(([^)]*)\)", s)
- ip = ba.group(2)
- port = ba.group(1)
- return (ip,port)
-
-if __name__ == "__main__":
- parser = OptionParser(usage="Usage: %prog [options] mesos_master mpi_program")
- parser.add_option("-n","--num",
- help="number of slots/mpd:s to allocate (default 1)",
- dest="num", type="int", default=1)
- parser.add_option("-c","--cpus",
- help="number of cpus per slot (default 1)",
- dest="cpus", type="int", default=CPUS)
- parser.add_option("-m","--mem",
- help="number of MB of memory per slot (default 1GB)",
- dest="mem", type="int", default=MEM)
-
- (options,args)=parser.parse_args()
- if len(args)<2:
- print >>sys.stderr, "At least two parameters required."
- print >>sys.stderr, "Use --help to show usage."
- exit(2)
-
- TOTAL_TASKS = options.num
- CPUS = options.cpus
- MEM = options.mem
- MPI_TASK = " ".join(args[1:])
-
- print "Connecting to mesos master %s" % args[0]
-
- try:
- call(["mpd","--daemon"])
- mpdtraceout = Popen("mpdtrace -l", shell=True, stdout=PIPE).stdout
- traceline = mpdtraceout.readline()
- except OSError,e:
- print >>sys.stderr, "Error starting mpd or mpdtrace"
- print >>sys.stderr, e
- exit(2)
-
- (ip,port) = parseIpPort(traceline)
-
- MPD_PID = traceline.split(" ")[0]
- print "MPD_PID is %s" % MPD_PID
-
- sched = MyScheduler(ip, port)
- mesos.MesosSchedulerDriver(sched, args[0]).run()
View
49 frameworks/mpi/startmpd.py
@@ -1,49 +0,0 @@
-#!/usr/bin/env python
-import mesos
-import mesos_pb2
-import sys
-import time
-import os
-import atexit
-
-from subprocess import *
-
-def cleanup():
- try:
- # TODO(*): This will kill ALL mpds...oops.
- print "cleanup"
- os.waitpid(Popen("pkill -f /usr/local/bin/mpd", shell=True).pid, 0)
- except Exception, e:
- print e
- None
-
-class MyExecutor(mesos.Executor):
- def init(self, driver, arg):
- [ip,port] = arg.data.split(":")
- self.ip = ip
- self.port = port
-
- def launchTask(self, driver, task):
- print "Running task %s" % task.task_id.value
- update = mesos_pb2.TaskStatus()
- update.task_id.value = task.task_id.value
- update.state = mesos_pb2.TASK_RUNNING
- driver.sendStatusUpdate(update)
- Popen("mpd -n -h "+self.ip+" -p "+self.port, shell=True)
-
- def killTask(self, driver, tid):
- # TODO(*): Kill only one of the mpd's!
- sys.exit(1)
-
- def shutdown(self, driver):
- print "shutdown"
- cleanup()
-
- def error(self, driver, code, message):
- print "Error: %s" % message
-
-if __name__ == "__main__":
- print "Starting executor"
- atexit.register(cleanup)
- executor = MyExecutor()
- mesos.MesosExecutorDriver(executor).run()
View
15 frameworks/mpi/startmpd.sh
@@ -1,15 +0,0 @@
-#!/bin/bash
-
-if [ "x$PYTHON" == "x" ]; then
- PYTHON=python
- if [ "`uname`" == "SunOS" ]; then
- PYTHON=python2.6
- fi
-fi
-
-if [ "x$MESOS_HOME" == "x" ]; then
- MESOS_HOME="$(dirname $0)/../.."
-fi
-
-export PYTHONPATH=$MESOS_HOME/lib/python:$MESOS_HOME/third_party/protobuf-2.3.0/python:$PYTHONPATH
-exec $PYTHON "$(dirname $0)/startmpd.py" $@
View
59 mpi/README
@@ -0,0 +1,59 @@
+Mesos MPICH2 framework readme
+--------------------------------------------
+
+Table of Contents:
+1) Installing MPICH2
+2) Running the Mesos MPICH2 framework
+
+=====================
+1) INSTALLING MPICH2:
+=====================
+- This framework was developed for MPICH2 1.2(mpd was deprecated
+ starting 1.3) on Linux(Ubuntu 11.10) and OS X Lion.
+
+- You can install MPICH2 from scratch. You can get MPICH2 as well as
+ installation directions here:
+ http://www.mcs.anl.gov/research/projects/mpich2/. This tutorial
+ follows the latter. Unpack the tar.gz and...
+
+- To use MPI with Mesos, make sure to have MPICH2 installed on every
+ machine in your cluster.
+
+Setting up:
+-> Install and configure:
+mac : ./configure --prefix=/Users/_your_username_/mpich2-install
+ubuntu : ./configure --prefix=/home/_your_username_/mpich2-install
+ Then...
+ sudo make
+ sudo make install
+
+
+-> Optional: add mpich binaries to PATH. You can specify the path to
+ installed MPICH2 binaries using mpiexec-meso's '--path' option
+mac : sudo vim ~/.bash_profile
+ export PATH=/Users/_your_username_/mpich2-install/bin:$PATH
+ubuntu : sudo vim ~/.bashrc
+ export PATH=/home/_your_username_/mpich2-install/bin:$PATH
+
+-> Create .mpd conf file in home directory:
+ echo "secretword=nil" > ~/.mpd.conf
+ chmod 600 .mpd.conf
+
+-> Check installation - these should all return the PATH's set above
+ which mpd
+ which mpiexec
+ which mpirun
+
+
+=====================================
+2) RUNNING THE MESOS MPICH2 FRAMEWORK
+=====================================
+
+Using/testing mpiexec-mesos:
+-> Start a Mesos master and slaves
+
+-> How to run a Hello, World! program (pass the -h flag to see help options):
+ mpicc helloworld.c -helloworld
+ ./mpiexec-mesos 127.0.0.1:5050 ./helloworld
+ Paths to mesos, protobuf, and distribute eggs can be specified by setting
+ respective environment variables in mpiexec-mesos.
View
43 mpi/mpiexec-mesos.in
@@ -0,0 +1,43 @@
+#!/bin/sh
+
+# This script uses MESOS_SOURCE_DIR and MESOS_BUILD_DIR which come
+# from configuration substitutions.
+MESOS_SOURCE_DIR=@abs_top_srcdir@
+MESOS_BUILD_DIR=@abs_top_builddir@
+
+# Use colors for errors.
+. ${MESOS_SOURCE_DIR}/support/colors.sh
+
+# Force the use of the Python interpreter configured during building.
+test ! -z "${PYTHON}" && \
+ echo "${RED}Ignoring PYTHON environment variable (using @PYTHON@)${NORMAL}"
+
+PYTHON=@PYTHON@
+
+DISTRIBUTE_EGG=`echo ${MESOS_BUILD_DIR}/third_party/distribute-*/dist/*.egg`
+
+test ! -e ${DISTRIBUTE_EGG} && \
+ echo "${RED}Failed to find ${DISTRIBUTE_EGG}${NORMAL}" && \
+ exit 1
+
+PROTOBUF=${MESOS_BUILD_DIR}/third_party/protobuf-2.4.1
+PROTOBUF_EGG=`echo ${PROTOBUF}/python/dist/protobuf*.egg`
+
+test ! -e ${PROTOBUF_EGG} && \
+ echo "${RED}Failed to find ${PROTOBUF_EGG}${NORMAL}" && \
+ exit 1
+
+MESOS_EGG=`echo ${MESOS_BUILD_DIR}/src/python/dist/mesos*.egg`
+
+test ! -e ${MESOS_EGG} && \
+ echo "${RED}Failed to find ${MESOS_EGG}${NORMAL}" && \
+ exit 1
+
+SCRIPT=`dirname ${0}`/mpiexec-mesos.py
+
+test ! -e ${SCRIPT} && \
+ echo "${RED}Failed to find ${SCRIPT}${NORMAL}" && \
+ exit 1
+
+PYTHONPATH="${DISTRIBUTE_EGG}:${MESOS_EGG}:${PROTOBUF_EGG}" \
+ exec ${PYTHON} ${SCRIPT} "${@}"
View
216 mpi/mpiexec-mesos.py
@@ -0,0 +1,216 @@
+#!/usr/bin/env python
+
+import mesos
+import mesos_pb2
+import os
+import sys
+import time
+import re
+import threading
+
+from optparse import OptionParser
+from subprocess import *
+
+
+def mpiexec():
+ print "We've launched all our MPDs; waiting for them to come up"
+
+ while countMPDs() <= TOTAL_MPDS:
+ print "...waiting on MPD(s)..."
+ time.sleep(1)
+ print "Got %d mpd(s), running mpiexec" % TOTAL_MPDS
+
+ try:
+ print "Running mpiexec"
+ call([MPICH2PATH + 'mpiexec', '-1', '-n', str(TOTAL_MPDS)] + MPI_PROGRAM)
+
+ except OSError,e:
+ print >> sys.stderr, "Error executing mpiexec"
+ print >> sys.stderr, e
+ exit(2)
+
+ print "mpiexec completed, calling mpdallexit %s" % MPD_PID
+
+ # Ring/slave mpd daemons will be killed on executor's shutdown() if
+ # framework scheduler fails to call 'mpdallexit'.
+ call([MPICH2PATH + 'mpdallexit', MPD_PID])
+
+
+class MPIScheduler(mesos.Scheduler):
+
+ def __init__(self, options, ip, port):
+ self.mpdsLaunched = 0
+ self.mpdsFinished = 0
+ self.ip = ip
+ self.port = port
+ self.options = options
+ self.startedExec = False
+
+ def registered(self, driver, fid, masterInfo):
+ print "Mesos MPI scheduler and mpd running at %s:%s" % (self.ip, self.port)
+ print "Registered with framework ID %s" % fid.value
+
+ def resourceOffers(self, driver, offers):
+ print "Got %d resource offers" % len(offers)
+
+ for offer in offers:
+ print "Considering resource offer %s from %s" % (offer.id.value, offer.hostname)
+
+ if self.mpdsLaunched == TOTAL_MPDS:
+ print "Declining permanently because we have already launched enough tasks"
+ driver.declineOffer(offer.id)
+ continue
+
+ cpus = 0
+ mem = 0
+ tasks = []
+
+ for resource in offer.resources:
+ if resource.name == "cpus":
+ cpus = resource.scalar.value
+ elif resource.name == "mem":
+ mem = resource.scalar.value
+
+ if cpus < CPUS or mem < MEM:
+ print "Declining offer due to too few resources"
+ driver.declineOffer(offer.id)
+ else:
+ tid = self.mpdsLaunched
+ self.mpdsLaunched += 1
+
+ print "Accepting offer on %s to start mpd %d" % (offer.hostname, tid)
+
+ task = mesos_pb2.TaskInfo()
+ task.task_id.value = str(tid)
+ task.slave_id.value = offer.slave_id.value
+ task.name = "task %d " % tid
+
+ cpus = task.resources.add()
+ cpus.name = "cpus"
+ cpus.type = mesos_pb2.Value.SCALAR
+ cpus.scalar.value = CPUS
+
+ mem = task.resources.add()
+ mem.name = "mem"
+ mem.type = mesos_pb2.Value.SCALAR
+ mem.scalar.value = MEM
+
+ task.command.value = "%smpd --noconsole --ncpus=%d --host=%s --port=%s" % (MPICH2PATH, CPUS, self.ip, self.port)
+
+ tasks.append(task)
+
+ print "Replying to offer: launching mpd %d on host %s" % (tid, offer.hostname)
+ driver.launchTasks(offer.id, tasks)
+
+ if not self.startedExec and self.mpdsLaunched == TOTAL_MPDS:
+ threading.Thread(target = mpiexec).start()
+ self.startedExec = True
+
+ def statusUpdate(self, driver, update):
+ print "Task %s in state %s" % (update.task_id.value, update.state)
+ if (update.state == mesos_pb2.TASK_FAILED or
+ update.state == mesos_pb2.TASK_KILLED or
+ update.state == mesos_pb2.TASK_LOST):
+ print "A task finished unexpectedly, calling mpdexit on %s" % MPD_PID
+ call([MPICH2PATH + "mpdexit", MPD_PID])
+ driver.stop()
+ if (update.state == mesos_pb2.TASK_FINISHED):
+ self.mpdsFinished += 1
+ if self.mpdsFinished == TOTAL_MPDS:
+ print "All tasks done, all mpd's closed, exiting"
+ driver.stop()
+
+
+def countMPDs():
+ try:
+ mpdtraceproc = Popen(MPICH2PATH + "mpdtrace -l", shell=True, stdout=PIPE)
+ mpdtraceline = mpdtraceproc.communicate()[0]
+ return mpdtraceline.count("\n")
+ except OSError,e:
+ print >>sys.stderr, "Error starting mpd or mpdtrace"
+ print >>sys.stderr, e
+ exit(2)
+
+
+def parseIpPort(s):
+ ba = re.search("_([^ ]*) \(([^)]*)\)", s)
+ ip = ba.group(2)
+ port = ba.group(1)
+ return (ip, port)
+
+
+if __name__ == "__main__":
+ parser = OptionParser(usage="Usage: %prog [options] mesos_master mpi_program")
+ parser.disable_interspersed_args()
+ parser.add_option("-n", "--num",
+ help="number of mpd's to allocate (default 1)",
+ dest="num", type="int", default=1)
+ parser.add_option("-c", "--cpus",
+ help="number of cpus per mpd (default 1)",
+ dest="cpus", type="int", default=1)
+ parser.add_option("-m","--mem",
+ help="number of MB of memory per mpd (default 1GB)",
+ dest="mem", type="int", default=1024)
+ parser.add_option("--name",
+ help="framework name", dest="name", type="string")
+ parser.add_option("-p","--path",
+ help="path to look for MPICH2 binaries (mpd, mpiexec, etc.)",
+ dest="path", type="string", default="")
+ parser.add_option("--ifhn-master",
+ help="alt. interface hostname for what mpd is running on (for scheduler)",
+ dest="ifhn_master", type="string")
+
+ # Add options to configure cpus and mem.
+ (options,args) = parser.parse_args()
+ if len(args) < 2:
+ print >> sys.stderr, "At least two parameters required."
+ print >> sys.stderr, "Use --help to show usage."
+ exit(2)
+
+ TOTAL_MPDS = options.num
+ CPUS = options.cpus
+ MEM = options.mem
+ MPI_PROGRAM = args[1:]
+
+ # Give options.path a trailing '/', if it doesn't have one already.
+ MPICH2PATH = os.path.join(options.path, "")
+
+ print "Connecting to Mesos master %s" % args[0]
+
+ try:
+ mpd_cmd = MPICH2PATH + "mpd"
+ mpdtrace_cmd = MPICH2PATH + "mpdtrace -l"
+
+ if options.ifhn_master is not None:
+ call([mpd_cmd, "--daemon", "--ifhn=" + options.ifhn_master])
+ else:
+ call([mpd_cmd, "--daemon"])
+
+ mpdtraceproc = Popen(mpdtrace_cmd, shell=True, stdout=PIPE)
+ mpdtraceout = mpdtraceproc.communicate()[0]
+
+ except OSError,e:
+ print >> sys.stderr, "Error starting mpd or mpdtrace"
+ print >> sys.stderr, e
+ exit(2)
+
+ (ip,port) = parseIpPort(mpdtraceout)
+
+ MPD_PID = mpdtraceout.split(" ")[0]
+ print "MPD_PID is %s" % MPD_PID
+
+ scheduler = MPIScheduler(options, ip, port)
+
+ framework = mesos_pb2.FrameworkInfo()
+ framework.user = ""
+
+ if options.name is not None:
+ framework.name = options.name
+ else:
+ framework.name = "MPI: %s" % MPI_PROGRAM[0]
+
+ driver = mesos.MesosSchedulerDriver(
+ scheduler,
+ framework,
+ args[0])
+ sys.exit(0 if driver.run() == mesos_pb2.DRIVER_STOPPED else 1)
Please sign in to comment.
Something went wrong with that request. Please try again.