Skip to content

Mesos Tutorial: Fog Computing at Hella Scale

Paco Nathan edited this page Dec 18, 2013 · 11 revisions

This project, Exelixi, began as a tutorial for building frameworks with Mesos.

Table of Contents

Apache Mesos has reasonably good documentation. Relatively speaking, if you squint really hard. Even so, there was something missing. The developer community had great open source examples, in terms of frameworks written in Scala such as Spark, Chronos, Marathon, and also a good Scala tutorial by Tobi Knaup. Twitter has a new project called Aurora written in Java and Python. Douban has DPark written in Python. Those are all excellent projects, but relatively complex in scope.

The community needed an introductory tutorial. Mesos is, in a sense, "building blocks" for creating new distributed frameworks. A simple step-by-step process for assembling those building blocks was missing. The simpler the better -- with extra points for code in pure Python, to make it approachable for a wide audience. BTW, for a really great discussion about Mesos, etc., as building blocks for distributed systems, check out this LinkedIn engineering article by Jay Kreps.

There is one example of a simple Python framework in Mesos, used for testing the Python API. Unfortunately, test_framework.py is difficult to run outside of the Mesos testing environment. That's not quite the best starting point for people who are just starting out.

Definitions for Datacenter Computing

Let's start by defining some terminology. Running apps in the context of Mesos implies that those apps run on some cluster. Within a cluster, there are a number of nodes -- that is, individual servers. Some of the nodes are designated as slaves (performing the work) while others are masters (coordinating the work).

Mesos distributed kernel

There are distributed frameworks that run on clusters, and applications which run atop those frameworks.

For additional background, Google published an amazingly great paper entitled "Omega" at EuroSys 2013 about their experiences with datacenter computing. That paper goes into gory details in its analysis of cluster resource scheduling. Well worth a good read. Some important takeaways include:

  • multiple abstraction levels are involved in efficient cluster resource scheduling
  • services are harder to schedule than batch, and more important for use cases

Based on Google's definitions, Mesos fits into the category of a two-level scheduler. In other words, a distributed framework running atop Mesos is an abstraction involving two parts: a scheduler which coordinates work, and executors which perform the work.

Mesos distributed framework

The notion of "two-level" comes in because at one level resource offers get presented to the scheduler, while at another level the executors consume those resource offers to perform units of work. Ultimately, the hard part of the problem gets handled inside the kernel... not unlike most of the hard problems involving operating systems. In operation, Mesos makes resource offers to frameworks, launches executors, then gets the hell out of the way. Then frameworks can make the most of the available resources, without having to coordinate with Mesos repeatedly. That approach allows a Mesos cluster to scale up to tens of thousands of nodes -- again, in what Google calls datacenter computing.

Let's apply this terminology to a known example. Apache Hadoop is a kind of distributed framework, and it runs atop Mesos. An application written as a graph of MapReduce jobs in Hadoop is, well, an application. Hadoop's job tracker plays the role of scheduler, communicating with Mesos to receive resource offers. Then the Hadoop task trackers play the role of executor, consuming the scheduled resources to perform units of work.

Architecture for a Mesos Framework

Exelixi needed to show some large-scale problem which could be parallelized. A genetic algorithm (GA) framework, with horizontal scale-out, was chosen as the problem domain. Ergo the name, which comes from the Greek word ekseliksi meaning progress. Mesos-related projects have a tradition of using Greek names, and that's a nice callback to the evolutionary aspect of GAs. Seriously, we checked with native Greek speakers, to be sure!

Before we dive into the architecture, please check out the Problem Domain: Genetic Algorithms overview. Note that those requirements fit quite well with what Apache Mesos provides for building distributed frameworks. Having that matter settled, let's proceed with our step-by-step tutorial.

Step 1: Following the Mesos two-level scheduler abstraction, we divide the framework into two parts:

  • a scheduler which receives resource offers from Mesos
  • a set of executors which distribute the data+processing

We'll need to iterate through multiple generations for each GA problem, so we define a unit of work as a set of long-running processes, scattered across a cluster. Executors serve primarily to launch these worker processes (one per slave), then notify the scheduler if a worker process dies. Simple enough.

Exelixi services layer

Step 2: Next, we define a services layer, so that the unit of work being performed can orchestrate its workers. Each worker must manage a shard of the GA population, and we can partition the data using a hash ring. Each worker also needs to provide a set of REST services, for lightweight communications within the framework -- which is how that orchestration gets performed. Each worker also provides a task queue, plus a barrier to synchronize the distributed processing.

Note that, so far, none of this architecture is specific to GA implementations. In fact, it could be extended to perform a wide variety of distributed data+calculations, each abstracted as a UnitOfWork. Hint, hint. Just in case you want to extend the GitHub code repo for some project beyond the scope of this tutorial. Nudge, nudge.

Python Impl for a Scheduler

Step 3: Next, be sure to install the Python bindings for Mesos on each of the masters and slaves. There are Python Eggs available for specific Mesos releases at http://mesosphere.io/downloads/

We can test the installation at any point simply by attempting to import the mesos package into Python:

python -c 'import mesos'

Great, once that's installed then we can write a module to subclass the mesos.Scheduler and mesos.Executor classes. For the following example code, take a look at the resource.py module in Exelixi.

Step 4: In the MesosScheduler class, we subclass mesos.Scheduler and define its constructor:

def __init__ (self, executor, exe_path, n_workers, uow_name, prefix, cpu_alloc, mem_alloc):
    self.executor = executor
    self.taskData = {}
    self.tasksLaunched = 0
    self.tasksFinished = 0
    self.messagesSent = 0
    self.messagesReceived = 0

Those are the required members, essentially to keep track of the messages between the scheduler and executors for requested tasks. Also, we keep track of the resources required by our framework:

    self._cpu_alloc = cpu_alloc
    self._mem_alloc = mem_alloc

Some other code follows, defining several protected members to customize the class for Exelixi:

    self._executors = {}
    self._exe_path = exe_path
    self._n_workers = n_workers
    self._uow_name = uow_name
    self._prefix = prefix

Next, we define the resourceOffers method, which gets invoked when resources are offered to the framework. BTW, the comments in the Python source code do a pretty good job of explaining the gory details of how these callback methods get used in Mesos.

def resourceOffers (self, driver, offers):
    for offer in offers:
        # do something with these offers... or not

In the case of Exelixi, we need to have n_workers number of worker processes running, and we want to avoid having more than one on a given slave. So there's a test for that -- in other words, which resource offers to accept. YMMV. More importantly, we define the tasks to launch on the slaves:

tid = self.tasksLaunched
self.tasksLaunched += 1

task = mesos_pb2.TaskInfo()
task.task_id.value = str(tid)
task.slave_id.value = offer.slave_id.value
task.name = "task %d" % tid
task.executor.MergeFrom(self.executor)

Note that all the data exchanged with Mesos is handled using protocol buffers. Frankly, those are not particularly well-documented in Mesos. Sometimes it becomes necessary to review the source code for mesos_pb2 to understand how to access or interpret fields. It would be great to have better repr() support in the Python API for Mesos -- if you're looking for a weekend project?

Step 5: Anywho, elsewhere within this test block, the really interesting part of using Mesos begins to happen. We get to specify the resources needed. First, we define the CPU requirements based on the cpu_alloc parameter passed into the MesosScheduler constructor:

cpus = task.resources.add()
cpus.name = "cpus"
cpus.type = mesos_pb2.Value.SCALAR
cpus.scalar.value = self._cpu_alloc

Next, we specify the memory requirements based on the mem_alloc parameter:

mem = task.resources.add()
mem.name = "mem"
mem.type = mesos_pb2.Value.SCALAR
mem.scalar.value = self._mem_alloc

Then we add the fully-defined task that will consume these resources on the executor:

tasks.append(task)
self.taskData[task.task_id.value] = (offer.slave_id, task.executor.executor_id)

Finally, we make a request the Mesos driver, which was passed in as the driver parameter, to launch the tasks defined for this resource offer:

driver.launchTasks(offer.id, tasks)

Congrads. Our framework is up and running. At this point, the executor runs the defined tasks.

Step 6: When the status of a task changes, e.g., it completes, then Mesos calls the statusUpdate method on the scheduler to handle some accounting:

def statusUpdate (self, driver, update):
    if update.state == mesos_pb2.TASK_FINISHED:
        self.tasksFinished += 1
        slave_id, executor_id = self.taskData[update.task_id.value]

To preserve its sanity, the scheduler keeps track of the number of tasks completed. Also, there's some "payload" returned from running the task on the slave, provided in the update.data parameter. We'll discuss in just a bit what's happening on the executor side of this message, but as a spoiler the initial task runs some discovery using the Python package psutil to see what resourses are available.

telemetry = loads(str(update.data))
logging.info("telemetry from slave %s, executor %s\n%s", slave_id.value, executor_id.value, str(update.data))
   
exe = self.lookup_executor(slave_id.value, executor_id.value)
exe.ip_addr = telemetry["ip_addr"]

The task also notes the IP address for the slave -- which is useful if you're running on Amazon AWS and you need to determine the internal IP addresses for the executors. Recall that we're setting up a hash ring and using REST endpoints, so those internal IP addresses will come in handy later.

Also note that we use str(update.data) to convert the data sent back to the framework. Mesos converts strings to unicode, so this is necessary.

The results are pretty cool. This gives us a glimpse at the remote slave in much more detail than what Mesos provides by default:

{
    "ip_addr": "10.232.49.126", 
    "mem_free": 7255506944, 
    "cpu_num": 2, 
    "cpu_times": {
        "user": 20.73, 
        "system": 11.3, 
        "idle": 2176.82
    }, 
    "disk_usage": {
        "free": 6551437312, 
        "percent": 17.4
    }, 
    "disk_io": {
        "read_count": 10673, 
        "write_count": 7454, 
        "read_bytes": 223757312, 
        "write_bytes": 257056768, 
        "read_time": 23496, 
        "write_time": 1184788
    }, 
    "network_io": {
        "bytes_sent": 1268864, 
        "bytes_recv": 20755701, 
        "packets_sent": 12179, 
        "packets_recv": 20322, 
        "errin": 0, 
        "errout": 0, 
        "dropin": 0, 
        "dropout": 0
    }
}

Imagine if some of the slaves in a cluster were, shall we say, a bit more commodity than others? In other words, flaky and about ready to fail. This remote telemetry might become useful for identifying slaves that have high failure rates for disk or network operations. Throw 'em back into the pond. Or something. For now, we'll just take what we get and call it good.

Step 7: Now we dispense with the preliminaries and run the worker service on the slave. Still within the statusUpdate method, after the initial task has completed and telemetry is received, we send back a message to the executor. That message is a request to launch the worker service:

self.messagesSent += 1
message = str(dumps([ self._exe_path, "-p", exe.port ]))
driver.sendFrameworkMessage(executor_id, slave_id, message)

The request is based on the exe.path parameter that was passed into MesosScheduler constructor. Of course, it assumes that the Exelixi code has been installed on the slave already. We'll cover that later. For now, the message goes through the Mesos driver, which makes the request to the executor. Again, note the string/unicode conversion being used for Mesos messages. The executor launches the worker service as a child process, running on the specified port.

Step 8: We're just about wrapped up with the scheduler. Next we define frameworkMessage, which gets invoked by Mesos when an executor sends a message.

def frameworkMessage (self, driver, executorId, slaveId, message):
    self.messagesReceived += 1

    if self.messagesReceived == self._n_workers:
        if self.messagesReceived != self.messagesSent:
            sys.exit(1)

        logging.info("all worker services launched and init tasks completed")

Great. At the point that "all worker services launched and init tasks completed" hits the log file, our framework is running across the cluster. In other words, the framework is finally ready to begin distributed processing for its UnitOfWork. We create a Framework object, which is definied in the service.py module. It needs to get the worker list, so that its UnitOfWork can orchestrate the worker services:

exe_info = self._executors.values()
worker_list = [ exe.get_shard_uri() for exe in exe_info ]

fra = Framework(self._uow_name, self._prefix)
fra.set_worker_list(worker_list, exe_info)

time.sleep(1)
fra.orchestrate_uow()
driver.stop()

After the UnitOfWork orchestration completes, we call the Mesos driver to stop the scheduler. All done, on this side. Except for how we launch the scheduler in the first place...

Step 9: Back in the resource.py module, we define a static method start_framework to do just that:

@staticmethod
def start_framework (master_uri, exe_path, n_workers, uow_name, prefix, cpu_alloc, mem_alloc):
    executor = mesos_pb2.ExecutorInfo()
    executor.executor_id.value = uuid1().hex
    executor.command.value = exe_path
    executor.name = "Exelixi Executor"
    executor.source = "per-job build"

    framework = mesos_pb2.FrameworkInfo()
    framework.user = "" # have Mesos fill in the current user
    framework.name = "Exelixi Framework"

First, we populate the executor and framework definitions for Mesos. Note where the executor command gets set to the value of the exe_path parameter ... that defines the executable which will be run as the executors. The whole shebang here hinges on that line.

Next, we create a MesosScheduler object, and capture the command line options:

sched = MesosScheduler(executor, exe_path, n_workers, uow_name, prefix, cpu_alloc, mem_alloc)

Then we initialize a Mesos driver, including any necessary credentials. Note that we're passing in the scheduler sched along with master_uri as the host:port for the Mesos master, which will probably be localhost:5050 in most cases.

if os.getenv("MESOS_AUTHENTICATE"):
    if not os.getenv("DEFAULT_PRINCIPAL"):
        sys.exit(1);
        
    if not os.getenv("DEFAULT_SECRET"):
        sys.exit(1);
        
    credential = mesos_pb2.Credential()
    credential.principal = os.getenv("DEFAULT_PRINCIPAL")
    credential.secret = os.getenv("DEFAULT_SECRET")
    
    driver = mesos.MesosSchedulerDriver(sched, framework, master_uri, credential)
else:
    driver = mesos.MesosSchedulerDriver(sched, framework, master_uri)

return driver

Great, the Mesos driver has started. The scheduler which we defined is ready to receive callbacks from Mesos.

Python Impl for an Executor

Step 10: Defining the Mesos executor is relatively simpler than the Mesos scheduler. Again in the resource.py module, we create MesosExecutor to subclass mesos.Executor. We define its launchTask callback method, with a closure called run_task inside:

def launchTask (self, driver, task):
    def run_task():
        update = mesos_pb2.TaskStatus()
        update.task_id.value = task.task_id.value
        update.state = mesos_pb2.TASK_RUNNING
        update.data = str("running discovery task")
        driver.sendStatusUpdate(update)

        update = mesos_pb2.TaskStatus()
        update.task_id.value = task.task_id.value
        update.state = mesos_pb2.TASK_FINISHED
        update.data = str(dumps(get_telemetry(), indent=4))
        driver.sendStatusUpdate(update)

That code defines the initial task that runs on the executor. Recall that we use psutil to get telemetry about the slave node. The Mesos driver gets called to send status updates back to the scheduler. Again, note the string/unicode conversion via str() which is required in those Mesos status updates.

Next, we create a thread to run the run_task closure. Note that these much run in new threads or processes, rather than inside the launchTask callback method. Otherwise, no other callbacks will be invoked on this executor until this callback has returned.

Also note that using gevent coroutines here in lieu of a new thread will tend to cause conflicts with how Mesos handles callbacks. Perhaps that may change after gevent changes in the 1.0 release to use libev instead of libevent. Even so, the run_task thread doesn't take up much in terms of resources:

thread = Thread(target=run_task)
thread.start()

Bueno. That gets the executor up and running.

Step 11: Next, we define the frameworkMessage callback method, which gets invoked when a message from the scheduler has arrived for this executor:

def frameworkMessage (self, driver, message):
    subprocess.Popen(loads(message))
    driver.sendFrameworkMessage(str("service launched"))

It has a fairly simple job... It launches the worker server, using subprocess.Popen to fork a child process. Then it notifies the scheduler that the worker service was successfully launched.

NB: you really really want to use a child process, otherwise the gevent concurrency used elsewhere in Exelixi will tend to conflict with how Mesos handles callbacks.

Step 12: So close, you can almost feel it, eh? New we define a static method run_executor to launch the executor:

@staticmethod
def run_executor ():
    driver = mesos.MesosExecutorDriver(MesosExecutor())
    sys.exit(0 if driver.run() == mesos_pb2.DRIVER_STOPPED else 1)

That start a Mesos driver on the executor, and run until it is stopped externally by the Exelixi framework. For example, some REST endpoint gets called to stop the service, after the distributed UnitOfWork orchestration completes. Or if something relatively tragic happens, such as a node failure.

When the executor gets run by Mesos on the slave, initially it just calls:

    MesosExecutor.run_executor()

Can haz Mesos framework? Yes, can haz!

Command Line Use

Step 13: For the proverbial thirteenth step, let's look at how the scheduler and executor are tied together into the Exelixi framework. Check the exelixi.py module in the parse_cli_args method to see where the command arguments get parsed, where the optional parameters get set, etc. For example, the --cpu option specifies how much CPU needs to be allocated per exectuor. Similarly, the --mem option specified how much memory is required.

The --master option tells Exelixi to launch the framework. That calls the MesosScheduler.start_framework static method to launch the Mesos driver. Note that this command is expected to be run from a Mesos master, so the --master option takes a HOST:PORT argument which would typically be localhost:5050 in practice. One nice feature of Mesos is that when a program makes a REST call to any master, it will perform a HTTP redirect to the leader among the masters. So, login to any one of the Mesos masters, point to localhost:5050, and the framework will get redirected to the leader.

To run Exelixi, check the Quick Start instructions. You can run it in a standalone mode, without the Mesos resource scheduler layer -- just with the services layer running. But that's mostly intended for debugging, not so much fun in practice. More to the point, the Quick Start describes how to launch a Mesos cluster and run Exelixi there.

In the default example, a very simple GA attempts to find lists of five integers which sum to the number 231. With the following command line, this runs for nine generations using two worker services:

ubuntu@ec2-54-196-88-42:~/exelixi-master$ ./src/exelixi.py -m localhost:5050 -w 2
gen	0	size	46	total	46	mse	1.45e-01	max	9.96e-01	med	7.32e-01	avg	6.14e-01
gen	1	size	55	total	89	mse	3.72e-02	max	9.96e-01	med	8.87e-01	avg	6.24e-01
gen	2	size	50	total	124	mse	3.49e-02	max	9.96e-01	med	9.26e-01	avg	5.15e-01
gen	3	size	42	total	152	mse	4.59e-02	max	1.00e+00	med	9.70e-01	avg	5.13e-01
gen	4	size	40	total	181	mse	3.58e-02	max	1.00e+00	med	9.13e-01	avg	6.02e-01
gen	5	size	44	total	213	mse	3.98e-02	max	1.00e+00	med	9.33e-01	avg	5.61e-01
gen	6	size	35	total	235	mse	5.82e-02	max	1.00e+00	med	8.96e-01	avg	5.46e-01
gen	7	size	35	total	257	mse	5.48e-02	max	1.00e+00	med	9.38e-01	avg	5.48e-01
gen	8	size	27	total	271	mse	7.28e-02	max	1.00e+00	med	9.70e-01	avg	4.93e-01
gen	9	size	24	total	285	mse	1.74e-02	max	1.00e+00	med	9.70e-01	avg	5.13e-01

A total of 285 individuals get evaluated. Ultimately, two individuals have evolved perfect solutions and nine others provide fairly good solutions:

indiv	1.0000	2	[2, 37, 49, 62, 81]
indiv	1.0000	2	[15, 30, 46, 69, 71]
indiv	0.9957	8	[2, 31, 45, 74, 78]
indiv	0.9957	2	[2, 24, 37, 80, 89]
indiv	0.9957	0	[9, 31, 49, 62, 81]
indiv	0.9957	0	[13, 24, 24, 80, 89]
indiv	0.9957	0	[1, 16, 52, 64, 97]
indiv	0.9913	4	[5, 28, 49, 54, 97]
indiv	0.9913	2	[8, 21, 38, 73, 93]
indiv	0.9913	1	[20, 29, 45, 65, 70]
indiv	0.9913	0	[13, 24, 49, 62, 81]

The command lines and logs for this run have been captured as a GitHub gist. Keep in mind that this is a GA, so the (correct) results will probably be different every time it runs.

Altogether, the interaction of the moving parts -- the Mesos driver, scheduler, executor, workers, unit of work, command line interface, etc. -- is shown in the following activity diagram:

Exelixi activity diagram

Thanks for listening. And please, as you build out frameworks in Mesos, tell us about it by posting a note to the Apache Mesos developer community.