Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

The sfGearman plugin provides a symfony wrapper to gearman pecl module.

branch: master

Fetching latest commit…

Octocat-spinner-32-eaf2f5

Cannot retrieve the latest commit at this time

Octocat-spinner-32 config
Octocat-spinner-32 lib
Octocat-spinner-32 test
Octocat-spinner-32 LICENSE
Octocat-spinner-32 README
Octocat-spinner-32 package.xml.tmpl
README
sfGearmanPlugin
===============

The sfGearman plugin provides a symfony wrapper to gearman pecl module.

Features:

* configuration of servers and workers in a yaml config file
* run workers in a symfony task
* auto (un)serialization of job workloads/results
* a simple message queue manager
* a Doctrine Template to trigger tasks on record/table events
* a worker for gearman jobs based on MySQL TRIGGER (gearman UDF)


Installation
------------

First you need to install the gearman pecl module, version 0.6.0 minimum.

Then you can install this plugin the usual way (RTFM), or if you want to work with the trunk:

    $ cd plugins
    $ svn co http://svn.symfony-project.com/plugins/sfGearmanPlugin/trunk/ sfGearmanPlugin

Then activate the plugin in the `config/ProjectConfiguration.class.php` file.


Configuration
-------------

By default, there is a connection named "default" which targets a local gearman server.

Edit or create `config/gearman.yml` to suit your gearman server installation :

    [yml]
    all:
      server:
        default:
          host: 192.168.0.1
          port: 4730

You can also use host:port based notation :

    [yml]
    all:
      server:
        default: 127.0.0.1:4730

If you have more than one gearman job server, you can list them and mix notations this way :

    [yml]
    all:
      server:
        default:
          - 192.168.0.1:4730
          - { host: 192.168.0.2, port: 4730 }


Create a worker
---------------

Edit `config/gearman.yml` to define worker functions and callbacks, grouped by a key name :

    [yml]
    all:
      worker:
        example1:
          reverse: [Worker1, reverse]

We defined a worker named "example1" registering gearman function "reverse" with `Worker1::reverse()` callback.

You can register multiple function for one worker :

    [yml]
    all:
      worker:
        example1:
          reverse: [Worker1, reverse]
          hash: [Worker1, hash]


Next implement you callback:

    [php]
    /**
     * Gearman worker example1
     */
    class Worker1
    {
      /**
       * reverse work handler
       *
       * @param GearmanJob $job Gearman job
       * @return string         Result sent to client
       */
      public static function reverse($job)
      {
        return strrev($job->workload());
      }
    }


To understand `GearmanJob` and `$job->workload()`, read [gearman php module documentation](http://php.net/gearman).

To start your worker, use the symfony task `gearman:worker` with `--config=example1` :

    $ symfony gearman:worker --config=example1


This command starts a gearman worker, and exit after processing 100 jobs or waiting 20 secs for a job.
You can tweak this with `--count=100` and `--timeout=20` options (use 0 for count and a negative value for timeout to never end worker).

If you want your worker to restart automatically, use [supervisord](http://supervisord.org/) or [daemon-tools](http://cr.yp.to/daemontools.html), or any process control tool.

To see what happens, use `--verbose` option :

    $ symfony gearman:worker --config=example1 --verbose


See all options with :

    $ symfony help gearman:worker


If you want to trace jobs and workloads as well, you need to notify symfony that a job is processed :

    [php]
    /**
     * Gearman worker example1
     */
    class Worker1
    {
      /**
       * reverse work handler
       *
       * @param GearmanJob      $job    Gearman job
       * @param sfGearmanWorker $worker sfGearmanWorker
       * @return string                 Result sent to client
       */
      public static function reverse($job, $worker)
      {
        // sfGearman worker is passed as the 2nd parameter of the method
        // notifyEventJob() displays a trace in symfony task output
        // if --verbose is set, workload is logged too
        $worker->notifyEventJob($job);

        return strrev($job->workload());
      }
    }


Gearman protocol only handles strings in workloads, if you need to return an array or object as a worker result, use `sfGearman::serialize` :

    [php]
    /**
     * Gearman worker example1
     */
    class Worker1
    {
      /**
       * reverse work handler
       *
       * @param GearmanJob      $job    Gearman job
       * @param sfGearmanWorker $worker sfGearmanWorker
       * @return string                 Result sent to client
       */
      public static function hash($job, $worker)
      {
        $worker->notifyEventJob($job);

        $workload = $job->workload();

        $result = array(md5($workload), sha1($workload));

        return sfGearman::serialize($result);
      }
    }

The sfGearmanClient will automatically unserialize the result if needed.


Use a client
------------

To create a gearman client, use `sfGearmanClient::getInstance` :

    [php]
    // client connecting to default server
    $client = sfGearmanClient::getInstance();

    // client connecting to a different server defined in gearman.yml
    $client = sfGearmanClient::getInstance('local');


You have 2 shorthands methods to send a job to gearman server : `task('function' [, 'workload'])` and `background('function' [, 'workload'])`, example:

    [php]
    // this blocks until a worker do the job and return result
    $result = sfGearmanClient::getInstance()->task('reverse', 'Hello!');
    // $result == '!olleH'

    // this sends an asynchronous job to gearman server, the return value is a gearman handle
    $handle = sfGearmanClient::getInstance()->background('async');
    // $handle == 'H:host:id'


If you need priorities for your jobs, pass as 3rd parameter the level you want :

    [php]
    // this job has high priority
    $result = sfGearmanClient::getInstance()->task('reverse', 'Hello!', sfGearman::HIGH);

    // this job has low priority
    $result = sfGearmanClient::getInstance()->task('reverse', 'Hello!', sfGearman::LOW);


Message queue manager
---------------------

The plugin provides a `sfGearmanQueue` class to put and get messages in queues, usage :

    [php]
    // put a message in a queue named "q1"
    sfGearmanQueue::put('q1', 'a message');

    // later or elsewhere, get a message from queue
    $message = sfGearmanQueue::get('q1');

    // put a message with high priority (will be fetched first)
    sfGearmanQueue::put('q1', 'urgent', sfGearman::HIGH);

    // ::get() blocks forever until a message arrives, if you want to timeout, use 2nd parameter (in ms)
    try
    {
      $message = sfGearmanQueue::get('q1', 10000);
    }
    catch(sfGearmanTimeoutException $e)
    {
      // waited 10 secs but no message in queue
    }


Internally, this sends messages as serialized workloads of "queue.%name%" jobs.


Doctrine integration
--------------------

The sfGearmanPlugin provides a Doctrine Template which listens to insert/update/delete events and sends background jobs to gearman server.

Add the Gearmanable template to `doctrine/schema.yml`, for example we want to listen to the update events of articles:

    [yml]
    Article:
      actAs:
        Gearmanable: {events: [update]}
      columns:
        title:  string(200)

Update your models, then configure `gearman.yml` to create a doctrine worker :

    [yml]
    all:
      doctrine:
        example2:
          Article: ~

Unlike the classic worker configuration, the doctrine one is made of the model name as key and the list of events as value, ~ is an alias to all events defined in `schema.yml`.

Then implement worker callback, they need to be located in the model class and named "trigger%Event%" (to avoid overlapping), so :

    [php]
    class Article extends BaseArticle
    {
      public function triggerUpdate($modified)
      {
        if (in_array('title', $modified))
        {
          // update a search index, refresh symfony cache, ...
        }
      }
    }

Note that the only parameter for doctrine gearman work handler is an array of modified properties.

Then launches a doctrine worker with symfony `gearman:worker-doctrine` task :

    $ symfony gearman:worker-doctrine --config=example2

You could omit the `--config=` option, this loads all doctrine models and register all events defined in `schema.yml`, but this merge all jobs in same workers.


Then save as usual your records :

    [php]
    $article = Doctrine_Core::getTable('Article')->find($id);
    $article->title = 'new title';
    $article->save();

This is what happens :

1. The `->save()` sends a background job to gearman server containing serialized record
2. The gearman server sends the job to the worker
3. The worker unserializes the record and call the trigger

Note:

The object transits through gearman server, and the trigger is called in another php process.

So when the record arrives to symfony worker task, it may not exists anymore in the database, or may be out-of-date.

If you want a fresh copy, use doctrine `refresh()` in the handler :

    [php]
    class Article extends BaseArticle
    {
      public function triggerUpdate($modified)
      {
        try { $this->refresh(); }
        catch (Doctrine_Record_Exception $e) { return; }

        // here the record is reloaded from db
      }
    }

Don't use the previous snippet in `triggerDelete()` because what you have in the worker task is only a ghost of your record.


To access the gearman job object, use the method `->getGearmanJob()` :

    [php]
    class Article extends BaseArticle
    {
      public function triggerUpdate($modified)
      {
        $job = $this->getGearmanJob();
        $job->sendFail();
      }
    }


Custom doctrine jobs
--------------------

You can have custom jobs for Doctrine records, register them in `gearman.yml`:

    [yml]
    all:
      doctrine:
        example2:
          Article: [publish, ~]

Implement them in your model class:

    [php]
    class Article extends BaseArticle
    {
      public function publish($tweet, $ping)
      {
        try { $this->refresh(); }
        catch (Doctrine_Record_Exception $e) { return; }

        // heavy code to publish your article
      }
    }

Reload your worker task to work this new function.

Then launch tasks with `->task('function' [, ...])` for synchronous jobs, or `->taskBackground('function' [, ...])` for asynchronous ones.

    [php]
    $article = Doctrine_Core::getTable('Article')->find($id);
    $article->taskBackground('publish', false, true);
    // this returns immediately and let the symfony task do the heavy code for ->publish(false, true)


Doctrine table jobs
-------------------

You can have function for Doctrine table, example :

    [yml]
    all:
      doctrine:
        example2:
          Article: [buildFeed, publish, ~]


Define the job handler in the table class :

    [php]
    class TestArticleTable extends Doctrine_Table
    {
      public function buildFeed()
      {
        // build the feeds
      }
    }


Call `task()` or `taskBackground()` on the table :

    [php]
    Doctrine_Core::getTable('Article')->taskBackground('buildFeed');


Something went wrong with that request. Please try again.