From ae486d6832511666e212296aeac77415e4562183 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Fri, 22 Oct 2010 15:33:32 -0700 Subject: [PATCH] Updated mesos-submit to add a framework name parameter and a more robust method of passing parameters to the executor (pickling). --- frameworks/mesos-submit/executor.py | 21 ++++++++++--------- frameworks/mesos-submit/mesos_submit.py | 28 +++++++++++++++---------- 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/frameworks/mesos-submit/executor.py b/frameworks/mesos-submit/executor.py index 07f090c20..e407b1a96 100755 --- a/frameworks/mesos-submit/executor.py +++ b/frameworks/mesos-submit/executor.py @@ -1,5 +1,6 @@ #!/usr/bin/env python import os +import pickle import subprocess import sys import threading @@ -36,12 +37,13 @@ def run_command(command, driver): # This scheduler launches no further tasks but allows our one task to continue # running in the cluster -- the task essentially becomes its own scheduler. class SecondaryScheduler(mesos.Scheduler): - def __init__(self, command): + def __init__(self, framework_name, command): mesos.Scheduler.__init__(self) + self.framework_name = framework_name self.command = command def getFrameworkName(self, driver): - return "mesos-submit " + self.command + return self.framework_name def getExecutorInfo(self, driver): executorPath = os.path.join(os.getcwd(), "executor") @@ -63,9 +65,9 @@ def error(self, driver, code, message): # This function is called in a separate thread to run our secondary scheduler; # for some reason, things fail if we launch it from the executor's launchTask # callback (this is likely to be SWIG/Python related). -def run_scheduler(command, master, fid): +def run_scheduler(fid, framework_name, master, command): print "Starting secondary scheduler" - sched = SecondaryScheduler(command) + sched = SecondaryScheduler(framework_name, command) sched_driver = mesos.MesosSchedulerDriver(sched, master, fid) sched_driver.run() @@ -85,18 +87,17 @@ def launchTask(self, driver, task): if self.sched == None: print "Received task; going to register as scheduler" # Recover framework ID, master and command from task arg - pieces = task.arg.split("|") - fid = pieces[0] - master = pieces[1] - command = "|".join(pieces[2:]) # In case there are | characters in command - print "Parsed parameters:" + fid, framework_name, master, command = pickle.loads(task.arg) + print "Mesos-submit parameters:" print " framework ID = %s" % fid + print " framework name = %s" % framework_name print " master = %s" % master print " command = %s" % command # Start our secondary scheduler in a different thread (for some reason, # this fails if we do it from the same thread.. probably due to some # SWIG Python interaction). - Thread(target=run_scheduler, args=[command, master, fid]).start() + Thread(target=run_scheduler, + args=[fid, framework_name, master, command]).start() else: print "Error: received a second task -- this should never happen!" diff --git a/frameworks/mesos-submit/mesos_submit.py b/frameworks/mesos-submit/mesos_submit.py index 47fd62594..f1d087020 100755 --- a/frameworks/mesos-submit/mesos_submit.py +++ b/frameworks/mesos-submit/mesos_submit.py @@ -1,5 +1,6 @@ #!/usr/bin/env python import os +import pickle import re import sys import time @@ -25,20 +26,22 @@ # We currently don't recover if our task fails for some reason, but we # do print its state transitions so the user can notice this. class SubmitScheduler(mesos.Scheduler): - def __init__(self, cpus, mem, master, command): + def __init__(self, options, master, command): mesos.Scheduler.__init__(self) - self.cpus = cpus - self.mem = mem + if options.name != None: + self.framework_name = options.name + else: + self.framework_name = "mesos-submit " + command + self.cpus = options.cpus + self.mem = options.mem self.master = master self.command = command self.task_launched = False def getFrameworkName(self, driver): - print "In getFrameworkName" - return "mesos-submit " + self.command + return self.framework_name def getExecutorInfo(self, driver): - print "In getExecutorInfo" executorPath = os.path.join(os.getcwd(), "executor") return mesos.ExecutorInfo(executorPath, "") @@ -57,8 +60,9 @@ def resourceOffer(self, driver, oid, offers): if cpus >= self.cpus and mem >= self.mem: print "Accepting slot on slave %s (%s)" % (offer.slaveId, offer.host) params = {"cpus": "%d" % self.cpus, "mem": "%d" % self.mem} - arg = "%s|%s|%s" % (self.fid, self.master, self.command) - task = mesos.TaskDescription(0, offer.slaveId, "task", params, arg) + arg = [self.fid, self.framework_name, self.master, self.command] + task = mesos.TaskDescription(0, offer.slaveId, "task", params, + pickle.dumps(arg)) driver.replyToOffer(oid, [task], {"timeout": "1"}) self.task_launched = True return @@ -70,7 +74,7 @@ def error(self, driver, code, message): if message == "Framework failover": # Scheduler failover is currently reported by this error message; # this is kind of a brittle way to detect it, but it's all we can do now. - print "Secondary scheduler registered successfully; exiting mesos-submit" + print "In-cluster scheduler started; exiting mesos-submit" else: print "Error from Mesos: %s (error code: %d)" % (message, code) driver.stop() @@ -84,12 +88,14 @@ def error(self, driver, code, message): parser.add_option("-m","--mem", help="MB of memory to request (default: 512)", dest="mem", type="int", default=DEFAULT_MEM) - (options,args)= parser.parse_args() + parser.add_option("-n","--name", + help="Framework name", dest="name", type="string") + (options,args) = parser.parse_args() if len(args) < 2: parser.error("At least two parameters are required.") exit(2) master = args[0] command = " ".join(args[1:]) print "Connecting to mesos master %s" % master - sched = SubmitScheduler(options.cpus, options.mem, master, command) + sched = SubmitScheduler(options, master, command) mesos.MesosSchedulerDriver(sched, master).run()