Skip to content

Commit

Permalink
Updated mesos-submit to add a framework name parameter and a more robust
Browse files Browse the repository at this point in the history
method of passing parameters to the executor (pickling).
  • Loading branch information
mateiz committed Oct 22, 2010
1 parent d165891 commit ae486d6
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 21 deletions.
21 changes: 11 additions & 10 deletions frameworks/mesos-submit/executor.py
@@ -1,5 +1,6 @@
#!/usr/bin/env python
import os
import pickle
import subprocess
import sys
import threading
Expand Down Expand Up @@ -36,12 +37,13 @@ def run_command(command, driver):
# This scheduler launches no further tasks but allows our one task to continue
# running in the cluster -- the task essentially becomes its own scheduler.
class SecondaryScheduler(mesos.Scheduler):
def __init__(self, command):
def __init__(self, framework_name, command):
mesos.Scheduler.__init__(self)
self.framework_name = framework_name
self.command = command

def getFrameworkName(self, driver):
return "mesos-submit " + self.command
return self.framework_name

def getExecutorInfo(self, driver):
executorPath = os.path.join(os.getcwd(), "executor")
Expand All @@ -63,9 +65,9 @@ def error(self, driver, code, message):
# This function is called in a separate thread to run our secondary scheduler;
# for some reason, things fail if we launch it from the executor's launchTask
# callback (this is likely to be SWIG/Python related).
def run_scheduler(command, master, fid):
def run_scheduler(fid, framework_name, master, command):
print "Starting secondary scheduler"
sched = SecondaryScheduler(command)
sched = SecondaryScheduler(framework_name, command)
sched_driver = mesos.MesosSchedulerDriver(sched, master, fid)
sched_driver.run()

Expand All @@ -85,18 +87,17 @@ def launchTask(self, driver, task):
if self.sched == None:
print "Received task; going to register as scheduler"
# Recover framework ID, master and command from task arg
pieces = task.arg.split("|")
fid = pieces[0]
master = pieces[1]
command = "|".join(pieces[2:]) # In case there are | characters in command
print "Parsed parameters:"
fid, framework_name, master, command = pickle.loads(task.arg)
print "Mesos-submit parameters:"
print " framework ID = %s" % fid
print " framework name = %s" % framework_name
print " master = %s" % master
print " command = %s" % command
# Start our secondary scheduler in a different thread (for some reason,
# this fails if we do it from the same thread.. probably due to some
# SWIG Python interaction).
Thread(target=run_scheduler, args=[command, master, fid]).start()
Thread(target=run_scheduler,
args=[fid, framework_name, master, command]).start()
else:
print "Error: received a second task -- this should never happen!"

Expand Down
28 changes: 17 additions & 11 deletions frameworks/mesos-submit/mesos_submit.py
@@ -1,5 +1,6 @@
#!/usr/bin/env python
import os
import pickle
import re
import sys
import time
Expand All @@ -25,20 +26,22 @@
# We currently don't recover if our task fails for some reason, but we
# do print its state transitions so the user can notice this.
class SubmitScheduler(mesos.Scheduler):
def __init__(self, cpus, mem, master, command):
def __init__(self, options, master, command):
mesos.Scheduler.__init__(self)
self.cpus = cpus
self.mem = mem
if options.name != None:
self.framework_name = options.name
else:
self.framework_name = "mesos-submit " + command
self.cpus = options.cpus
self.mem = options.mem
self.master = master
self.command = command
self.task_launched = False

def getFrameworkName(self, driver):
print "In getFrameworkName"
return "mesos-submit " + self.command
return self.framework_name

def getExecutorInfo(self, driver):
print "In getExecutorInfo"
executorPath = os.path.join(os.getcwd(), "executor")
return mesos.ExecutorInfo(executorPath, "")

Expand All @@ -57,8 +60,9 @@ def resourceOffer(self, driver, oid, offers):
if cpus >= self.cpus and mem >= self.mem:
print "Accepting slot on slave %s (%s)" % (offer.slaveId, offer.host)
params = {"cpus": "%d" % self.cpus, "mem": "%d" % self.mem}
arg = "%s|%s|%s" % (self.fid, self.master, self.command)
task = mesos.TaskDescription(0, offer.slaveId, "task", params, arg)
arg = [self.fid, self.framework_name, self.master, self.command]
task = mesos.TaskDescription(0, offer.slaveId, "task", params,
pickle.dumps(arg))
driver.replyToOffer(oid, [task], {"timeout": "1"})
self.task_launched = True
return
Expand All @@ -70,7 +74,7 @@ def error(self, driver, code, message):
if message == "Framework failover":
# Scheduler failover is currently reported by this error message;
# this is kind of a brittle way to detect it, but it's all we can do now.
print "Secondary scheduler registered successfully; exiting mesos-submit"
print "In-cluster scheduler started; exiting mesos-submit"
else:
print "Error from Mesos: %s (error code: %d)" % (message, code)
driver.stop()
Expand All @@ -84,12 +88,14 @@ def error(self, driver, code, message):
parser.add_option("-m","--mem",
help="MB of memory to request (default: 512)",
dest="mem", type="int", default=DEFAULT_MEM)
(options,args)= parser.parse_args()
parser.add_option("-n","--name",
help="Framework name", dest="name", type="string")
(options,args) = parser.parse_args()
if len(args) < 2:
parser.error("At least two parameters are required.")
exit(2)
master = args[0]
command = " ".join(args[1:])
print "Connecting to mesos master %s" % master
sched = SubmitScheduler(options.cpus, options.mem, master, command)
sched = SubmitScheduler(options, master, command)
mesos.MesosSchedulerDriver(sched, master).run()

0 comments on commit ae486d6

Please sign in to comment.