Skip to content

robhu/ExecQueues

Repository files navigation

ExecQueues

This is just me playing around with how to execute jobs on remote machines and get back the results via some communications channel (referred to as ‘the storage backend’ throughout the rest of this document). This toy currently supports messaging via Redis or MySQL. The initial thinking behind this is outlined in the corresponding post on my blog.

The idea is that you have some number of machines that you want to run jobs on (e.g. on a farm), and some clients that what to execute those methods which don’t want to know the details of which machines the workers run on, how many of them there are, or how to transfer the inputs / outputs of those methods.

The worker.rb script acts as the worker, run this on your farm (or equivalent). The client.rb is an example client that constantly wants to find out what the value of various random multiples of pi are. It does this in a loop so I can test out performance.

How to get it running

  • Install and run a Redis server (this is used to store performance information atm regardless of which backend you use)

  • Install and run a MySQL server if you want to try MySQL

  • Run the create_db.sql script if you’re trying MySQL

  • Amend the Storage.backend= line in client.rb and worker.rb according to the storage backend you want to use

  • Start some worker.rb and client.rb processes

If you do this correctly you should see the clients printing out something like this:

[Sat Jun 05 18:15:59 +0100 2010] Submitted: #<Task:0x1014affb8 @id=272053, @state="unstarted", @type="CalculateMultipleOfPi", @input=759>
[Sat Jun 05 18:15:59 +0100 2010] Received:  #<Task:0x1014aed70 @id=272053, @state="finished", @type="CalculateMultipleOfPi", @output=2384.46882135, @input=759>

[Sat Jun 05 18:15:59 +0100 2010] Submitted: #<Task:0x1014ae550 @id=272054, @state="unstarted", @type="CalculateMultipleOfPi", @input=489>
[Sat Jun 05 18:15:59 +0100 2010] Received:  #<Task:0x1014ad380 @id=272054, @state="finished", @type="CalculateMultipleOfPi", @output=1536.23880585, @input=489>

...

Creating new types of tasks

To create a new task type put a .rb file in task_types that contains a class file that implements the execute method. For example:

class LocalExec
  def execute(command)
    `#{command}`
  end
end

or

class WhatMachineIsThis
  def execute(parameter)
    `hostname`
  end
end

Whatever the execute method returns will be what is passed back to the client that requested the job be executed.

Redis implementation

The Redis implementation works like this:

  • A task is created on the client, enqueued on unstarted queue (list) in Redis, the task is returned to the client with a unique id

  • The client starts doing a blpop (blocking pop) on a queue called finished:[id]

  • The workers have been doing a blpop (blocking pop) on the unstarted queue, so they pop the new task off

  • Based on the type field the correct class is instantiated, the execute method is called, the work is done, and the result is saved in the output field of the task

  • The task is enqueued on to a queue called finished:[id]

  • The worker continues it’s loop of doing blpop on unstarted, waiting for new jobs to run

  • The blpop on finished:[id] returns the completed task to the client

MySQL implementation

A table, tasks is used to store the tasks.

The process works as follows:

  • A task is created on the client, and is INSERTed in to the table, the task is returned to the client with a unique id

  • The client begins polling the tasks table, looking for that id with a state of finished

  • The workers poll the tasks table looking for tasks with state unstarted

  • When a worker finds such a task it issues an UPDATE, changing the state for that task to in_progress (this is to stop other workers trying to work on it)

  • Based on the type field the correct class is instantiated, the execute method is called, the work is done, and the result is saved in the output field of the task

  • An UPDATE is issued updating the row with the new output and the new state finished

  • The worker continues it’s loop of polling for new unstarted jobs

  • The client, which has been polling the table, waiting for the job to change to the finished state returns the row, deletes the row from the database, and returns it to the script

Ensuring each worker works on a different task

If several workers are polling the tasks table simultaneously they can both SELECT the same task/row, UPDATE that row (to state in_progress), and start work on the task.

The way I have prevented this is by using optimistic locking. The process for a worker is like this:

  • SELECTs a row with state unstarted (let’s assume here the row has the value 0 in it’s lock_version field)

  • Create a task object from the row

  • Change the state of the task object to in_progress

  • Increment the object’s lock_version field

  • Issue an update for the row corresponding to the task object like this: UPDATE tasks SET state='in_progress', lock_version=1 WHERE id=1234 AND lock_version=0

  • If the number of rows updated is 1 then the worker successfully got the row, if it’s 0 then some other worker issued an update and got the task so the worker continues looking for a new task

Performance

You can test the performance by running watch_performance.rb. Every second it will print the number of tasks that were completed in the last second.

The Redis system is about 15 times faster than the MySQL method (and uses almost no CPU when not busy) on my laptop. I suspect when there are more clients, and more jobs in the system the MySQL performance will fall even further (due worse locking problems).

Using a table of type MEMORY rather than INNODB does not make the MySQL implementation any faster. It could be that the lock contention could be improved by returning a random row (rather than the first row) to the client. This would require a full table scan (ORDER BY RAND means computing a random number for every row), but unless the table is very large (which it should never be) this should be no problem.

Running it all on my laptop (a 3 Ghz Macbook Pro) I get about 1,700 jobs/second with Redis, and about 110 jobs/second with MySQL.

Known limitations

  • You can only give a single parameter to a task (the input field)

  • The MySQL version limits the inputs and outputs to 256 characters

  • If a worker dies while it is processing a job the client will wait forever for the job to be completed as there’s no detection that a worker has died / failed leading to another worker processing the zombie job

Future plans

  • Provide an ‘out of comms channel’ method of storing the input/ouputs of tasks (e.g. on NFS) for tasks where the input/output is large (multi-megabyte)

  • Improve the MySQL performance

  • Make it entirely transparent, through proxies that hide everything on the client side (so you just do CalculateMultipleOfPi.execute(42) on the client and get the response as if CalculateMultipleOfPi.execute(42) were running locally)

  • Provide a way to run things asychronously

  • Implement an easy way to execute a callback when a remote job finishes execution

  • Specify connection details in a separate yaml configuration file

  • Resubmit jobs where the worker has died (I’ll probably never do that with this toy)

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages