Skip to content
Consume RabbitMQ messages into any cli program
Branch: master
Clone or download
corvus-ch Update build (#70)
- Switch to latest go version
- Definitely switch to go modules
- White list version branches
- Remove dependency to `gocoverutil
Latest commit 870ed18 Apr 23, 2019
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
acknowledger Switch to gocoverutil (#31) Jun 16, 2018
collector
command Fallback to logging interface by Brian Ketelsen (#38) Jun 26, 2018
config Allow to set queue declare options in config file (#50) Nov 10, 2018
consumer Clean and fix shutdown handling (#52) Dec 22, 2018
delivery Delivery package (#25) Feb 27, 2018
fixtures chore: Adjust fixture files Apr 18, 2019
log Fallback to logging interface by Brian Ketelsen (#38) Jun 26, 2018
processor
.codeclimate.yml
.gitignore Switch to gocoverutil (#31) Jun 16, 2018
.goreleaser.yml Make LICENSE plain text Jan 4, 2018
.travis.yml Update build (#70) Apr 23, 2019
CONTRIBUTING.md
LICENSE Make LICENSE plain text Jan 4, 2018
Makefile Update build (#70) Apr 23, 2019
README.md chore: Add section "Metrics" to README.md Apr 8, 2019
builder_test.go Increased test coverage in main package Dec 31, 2017
config_test.go Increased test coverage in main package Dec 31, 2017
docker-compose.yml Added docker-compose.yml file for easy testing May 18, 2016
error_test.go Fix remotley changed import path (#41) Aug 23, 2018
example.conf Allow to set queue declare options in config file (#50) Nov 10, 2018
go.mod
go.sum chore: Adjust fixture files Apr 18, 2019
integration_helper_process_test.go Reduce complexity in integration tests Dec 31, 2017
integration_test.go Ensure events get handled in the right order (#47) Nov 5, 2018
main.go Do not panic when serving metrics fail (#67) Apr 22, 2019

README.md

RabbitMQ cli consumer

Build Status Maintainability Test Coverage

If you are a fellow PHP developer just like me you're probably aware of the following fact: PHP is meant to die.

When using RabbitMQ with pure PHP consumers you have to deal with stability issues. Probably you are killing your consumers regularly. And try to solve the problem with supervisord. Which also means on every deploy you have to restart your consumers. A little bit dramatic if you ask me.

This is a fork of the work done by Richard van den Brand and provides a command that aims to solve the above described problem for RabbitMQ workers by delegate the long running part to a tool written in go which is much better suited for this task. The PHP application then is only executed when there is an AMQP message to process. This is comparable to how HTTP requests usually are handled where the webs server waits for new incoming requests and calls your script once for each request.

This fork came to be, when Richard van den Brand did no longer had the time to maintain his version. The main goals of the fork are:

  • Following the principle of The Twelve-Factor App environment dependent settings should be configurable by environment variables.
  • All logs, including the output of the called executable, will be available in STDOUT/STDERR.
  • The AMQP message will be passed via STDIN (not as argument with its limitation in size)
  • Have tests with a decent level of code coverage.

NOTE: If you previously used the consumer of Richard van den Brand, this version should work as a drop in replacement. Do not migrate blindly but do some testing before. Effort was made to remain backwards compatible, no guarantees are made.

Installation

You have the choice to either compile yourself or by installing via package or binary.

Binary

Binaries can be found at: https://github.com/corvus-ch/rabbitmq-cli-consumer/releases

Compiling

This section assumes you're familiar with the Go language.

Use go get to get the source local:

$ go get github.com/corvus-ch/rabbitmq-cli-consumer

Change to the directory, e.g.:

$ cd $GOPATH/src/github.com/corvus-ch/rabbitmq-cli-consumer

Get the dependencies:

$ go get ./...

Then build and/or install:

$ go build
$ go install

Usage

rabbitmq-cli-consumer --verbose --url amqp://guest:guest@localhost --queue myqueue --executable '/path/to/your/app argument --flag'

Run without arguments or with -help switch to show the helptext:

Configuration

The file example.conf contains all available configuration options together with its explanation.

In Go the zero value for a string is "". So, any values not configured in the config file will result in a empty string. Now imagine you want to define an empty name for one of the configuration settings. Yes, we now cannot determine whether this value was empty on purpose or just left out. If you want to configure an empty string you have to be explicit by using the value <empty>.

rabbitmq-cli-consumer --verbose --url amqp://guest:guest@localhost --queue myqueue --executable command.php --configuration example.conf

Graceful shutdown

The consumer handles the signal SIGTERM. When SIGTERM is received, the AMQP channel will be canceled, preventing any new messages from being consumed. This allows to stop the consumer but let a currently running executable to finishing and acknowledgement of the message.

The executable

Your executable receives the message as the last argument. So consider the following:

rabbitmq-cli-consumer --verbose --url amqp://guest:guest@localhost --queue myqueue --executable command.php

The command.php file should look like this:

#!/usr/bin/env php
<?php
// This contains first argument
$message = $argv[1];

// Decode to get original value
$original = base64_decode($message);

// Start processing
if (do_heavy_lifting($original)) {
    // All well, then return 0
    exit(0);
}

// Let rabbitmq-cli-consumer know someting went wrong, message will be requeued.
exit(1);

Or a Symfony2 example:

rabbitmq-cli-consumer --verbose --url amqp://guest:guest@localhost --queue myqueue --executable 'app/console event:processing --env=prod'

Command looks like this:

<?php

namespace Vendor\EventBundle\Command;

use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class TestCommand extends ContainerAwareCommand
{
    protected function configure()
    {
        $this
            ->addArgument('event', InputArgument::REQUIRED)
            ->setName('event:processing')
        ;

    }

    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $message = base64_decode($input->getArgument('event'));

        $this->getContainer()->get('mailer')->send($message);

        exit(0);
    }
}

Compression

Depending on what you're passing around on the queue, it may be wise to enable compression support. If you don't you may encouter the infamous "Argument list too long" error.

When compression is enabled, the message gets compressed with zlib maximum compression before it's base64 encoded. We have to pay a performance penalty for this. If you are serializing large php objects I suggest to turn it on. Better safe then sorry.

In your config:

[rabbitmq]
compression = On

And in your php app:

#!/usr/bin/env php
<?php
// This contains first argument
$message = $argv[1];

// Decode to get compressed value
$original = base64_decode($message);

// Uncompresss
if (! $original = gzuncompress($original)) {
    // Probably wanna throw some exception here
    exit(1);
}

// Start processing
if (do_heavy_lifting($original)) {
    // All well, then return 0
    exit(0);
}

// Let rabbitmq-cli-consumer know someting went wrong, message will be requeued.
exit(1);

Including properties and message headers

If you need to access message headers and or properties, call the command with the --include option set.

rabbitmq-cli-consumer --verbose --url amqp://guest:guest@localhost --queue myqueue --executable command.php --include

The script then will receive a json encoded data structure which looks like the following.

{
  "properties": {
    "application_headers": {
      "name": "value"
    },
    "content_type": "",
    "content_encoding": "",
    "delivery_mode": 1,
    "priority": 0,
    "correlation_id": "",
    "reply_to": "",
    "expiration": "",
    "message_id": "",
    "timestamp": "0001-01-01T00:00:00Z",
    "type": "",
    "user_id": "",
    "app_id": ""
  },
  "delivery_info": {
    "message_count": 0,
    "consumer_tag": "ctag-./rabbitmq-cli-consumer-1",
    "delivery_tag": 2,
    "redelivered": true,
    "exchange": "example",
    "routing_key": ""
  },
  "body": ""
}

Change your script according to the following example.

#!/usr/bin/env php
<?php
// This contains first argument
$input = $argv[1];

// Decode to get original value also decrompress acording to your configuration.
$data = json_decode(base64_decode($input));

// Start processing
if (do_heavy_lifting($data->body, $data->properties)) {
    // All well, then return 0
    exit(0);
}

// Let rabbitmq-cli-consumer know someting went wrong, message will be requeued.
exit(1);

If you are using symfonies RabbitMQ bundle (php-amqplib/rabbitmq-bundle) you can wrap the consumer with the following symfony command.

<?php

namespace Vendor\EventBundle\Command;

use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class TestCommand extends ContainerAwareCommand
{
    protected function configure()
    {
        $this
            ->addArgument('event', InputArgument::REQUIRED)
            ->setName('event:processing')
        ;

    }

    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $data = json_decode(base64_decode($input->getArgument('event')), true);
        $message = new AMQPMessage($data['body'], $data['properties']);

        /** @var \PhpAmqpLib\Message\AMQPMessage\ConsumerInterface $consumer */
        $consumer = $this->getContainer()->get('consumer');

        if (false == $consumer->execute($message)) {
            exit(1);
        }
    }
}

Use pipes instead of arguments

When starting the consumer with the --pipes option, the AMQP message will be passed on to the executable using STDIN for the message body and fd3 for the metadata containing the properties and the delivery info encoded as JSON.

rabbitmq-cli-consumer --verbose --url amqp://guest:guest@localhost --queue myqueue --executable command.php --pipe
#!/usr/bin/env php
<?php

// Read the metadata from fd3.
$metadata = file_get_contents("php://fd/3");
if (false === $metadata) {
  fwrite(STDERR, "failed to read metadata from fd3\n");
  exit(1);
}

// Decode the metadata.
$metadata = json_decode($metadata, true);
if (JSON_ERROR_NONE != json_last_error()) {
  fwrite(STDERR, "failed to decode metadata\n");
  fwrite(STDERR, json_last_error_msg() . PHP_EOL);
  exit(1);
}

// Read the body from STDIN.
$body = file_get_contents("php://stdin");
if (false === $body) {
  fwrite(STDERR, "failed to read body from STDIN\n");
  exit(1);
}

Strict exit code processing

By default, any non-zero exit code will make consumer send a negative acknowledgement and re-queue message back to the queue, in some cases it may cause your consumer to fall into an infinite loop as re-queued message will be getting back to consumer and it probably will fail again.

It's possible to get better control over message acknowledgement by setting up strict exit code processing. In this mode consumer will acknowledge messages only if executable process return an allowed exit code.

Allowed exit codes

Exit Code Action
0 Acknowledgement
3 Reject
4 Reject and re-queue
5 Negative acknowledgement
6 Negative acknowledgement and re-queue

All other exit codes will cause consumer to fail.

Run consumer with --strict-exit-code option to enable strict exit code processing:

rabbitmq-cli-consumer --verbose --url amqp://guest:guest@localhost --queue myqueue --executable command.php --strict-exit-code

Make sure your executable returns correct exit code

#!/usr/bin/env php
<?php
// ...
try {
    if (do_heavy_lifting($data)) {
        // All well, then return 0
        exit(0);
    }
} catch(InvalidMessageBody $e) {
    exit(3); // Message is invalid, just reject and don't try to process again
} catch(TimeoutException $e) {
    exit(4); // Reject and try again
} catch(Exception $e) {
    exit(1); // Unexpected exception will cause consumer to stop consuming
}

Metrics

Metrics are following the Prometheus conventions. They are available via HTTP endpoint (http://127.0.0.1:9566/metrics), with default port 9566 and default path /metrics.

The following metrics are tracked:

Metric Type Description
rabbitmq_cli_consumer_process_total Counter The total number of processes executed. Processes are aggregated by their exit code.
rabbitmq_cli_consumer_process_duration_seconds Histogram The time spent by the consumer to process the message.
rabbitmq_cli_consumer_message_duration_seconds Histogram The time spent from publishing to finished processing the message. This requires the message to have the timestamp header set.

Contributing and license

This library is licenced under MIT. For information about how to contribute to this project, see CONTRIBUTING.md.

You can’t perform that action at this time.