Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

fixed coding standard according to PSR compliance

  • Loading branch information...
commit 2ec0aeb0f4fbb65b44663a6a98e223365060e927 1 parent 7162472
Brikou CARRE brikou authored
Showing with 2,831 additions and 2,726 deletions.
  1. +115 −112 examples/PHP/asyncsrv.php
  2. +45 −43 examples/PHP/flclient1.php
  3. +79 −74 examples/PHP/flclient2.php
  4. +6 −6 examples/PHP/flserver1.php
  5. +12 −12 examples/PHP/flserver2.php
  6. +6 −6 examples/PHP/hwclient.php
  7. +9 −9 examples/PHP/hwserver.php
  8. +1 −1  examples/PHP/identity.php
  9. +135 −131 examples/PHP/lbbroker.php
  10. +111 −106 examples/PHP/lbbroker2.php
  11. +45 −43 examples/PHP/lpclient.php
  12. +16 −16 examples/PHP/lpserver.php
  13. +359 −343 examples/PHP/mdbroker.php
  14. +139 −133 examples/PHP/mdcliapi.php
  15. +112 −105 examples/PHP/mdcliapi2.php
  16. +9 −9 examples/PHP/mdclient.php
  17. +10 −10 examples/PHP/mdclient2.php
  18. +1 −1  examples/PHP/mdp.php
  19. +5 −5 examples/PHP/mdworker.php
  20. +79 −73 examples/PHP/mdwrkapi.php
  21. +4 −4 examples/PHP/mmiecho.php
  22. +16 −17 examples/PHP/mspoller.php
  23. +26 −26 examples/PHP/msreader.php
  24. +33 −34 examples/PHP/mtrelay.php
  25. +23 −22 examples/PHP/mtserver.php
  26. +27 −28 examples/PHP/peering1.php
  27. +131 −129 examples/PHP/peering2.php
  28. +179 −176 examples/PHP/peering3.php
  29. +131 −124 examples/PHP/ppqueue.php
  30. +64 −63 examples/PHP/ppworker.php
  31. +7 −7 examples/PHP/psenvpub.php
  32. +6 −6 examples/PHP/psenvsub.php
  33. +26 −27 examples/PHP/rrbroker.php
  34. +5 −5 examples/PHP/rrclient.php
  35. +10 −10 examples/PHP/rrworker.php
  36. +44 −44 examples/PHP/rtdealer.php
  37. +42 −41 examples/PHP/rtreq.php
  38. +2 −2 examples/PHP/rtrouter.php
  39. +38 −40 examples/PHP/spqueue.php
  40. +19 −19 examples/PHP/spworker.php
  41. +41 −41 examples/PHP/suisnail.php
  42. +7 −7 examples/PHP/syncpub.php
  43. +6 −6 examples/PHP/syncsub.php
  44. +7 −7 examples/PHP/tasksink.php
  45. +7 −7 examples/PHP/tasksink2.php
  46. +8 −8 examples/PHP/taskvent.php
  47. +9 −9 examples/PHP/taskwork.php
  48. +21 −21 examples/PHP/taskwork2.php
  49. +19 −18 examples/PHP/ticlient.php
  50. +67 −62 examples/PHP/titanic.php
  51. +85 −83 examples/PHP/tripping.php
  52. +3 −3 examples/PHP/version.php
  53. +4 −4 examples/PHP/wuclient.php
  54. +11 −11 examples/PHP/wuproxy.php
  55. +7 −7 examples/PHP/wuserver.php
  56. +22 −21 examples/PHP/zhelpers.php
  57. +379 −348 examples/PHP/zmsg.php
  58. +1 −1  examples/PHP/zmsg_test.php
227 examples/PHP/asyncsrv.php
View
@@ -1,13 +1,13 @@
<?php
/*
* Asynchronous client-to-server (DEALER to ROUTER)
- *
+ *
* While this example runs in a single process, that is just to make
* it easier to start and stop the example. Each task has its own
* context and conceptually acts as a separate process.
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
-include "zmsg.php";
+include 'zmsg.php';
/* ---------------------------------------------------------------------
* This is our client task
@@ -15,33 +15,34 @@
* It collects responses as they arrive, and it prints them out. We will
* run several client tasks in parallel, each with a different random ID.
*/
-function client_task() {
- $context = new ZMQContext();
- $client = new ZMQSocket($context, ZMQ::SOCKET_DEALER);
-
- // Generate printable identity for the client
- $identity = sprintf ("%04X", rand(0, 0x10000));
- $client->setSockOpt(ZMQ::SOCKOPT_IDENTITY, $identity);
- $client->connect("tcp://localhost:5570");
-
- $read = $write = array();
- $poll = new ZMQPoll();
- $poll->add($client, ZMQ::POLL_IN);
-
- $request_nbr = 0;
- while(true) {
- // Tick once per second, pulling in arriving messages
- for($centitick = 0; $centitick < 100; $centitick++) {
- $events = $poll->poll($read, $write, 1000);
- $zmsg = new Zmsg($client);
- if($events) {
- $zmsg->recv();
- printf ("%s: %s%s", $identity, $zmsg->body(), PHP_EOL);
- }
- }
- $zmsg = new Zmsg($client);
- $zmsg->body_fmt("request #%d", ++$request_nbr)->send();
- }
+function client_task()
+{
+ $context = new ZMQContext();
+ $client = new ZMQSocket($context, ZMQ::SOCKET_DEALER);
+
+ // Generate printable identity for the client
+ $identity = sprintf ("%04X", rand(0, 0x10000));
+ $client->setSockOpt(ZMQ::SOCKOPT_IDENTITY, $identity);
+ $client->connect("tcp://localhost:5570");
+
+ $read = $write = array();
+ $poll = new ZMQPoll();
+ $poll->add($client, ZMQ::POLL_IN);
+
+ $request_nbr = 0;
+ while (true) {
+ // Tick once per second, pulling in arriving messages
+ for ($centitick = 0; $centitick < 100; $centitick++) {
+ $events = $poll->poll($read, $write, 1000);
+ $zmsg = new Zmsg($client);
+ if ($events) {
+ $zmsg->recv();
+ printf ("%s: %s%s", $identity, $zmsg->body(), PHP_EOL);
+ }
+ }
+ $zmsg = new Zmsg($client);
+ $zmsg->body_fmt("request #%d", ++$request_nbr)->send();
+ }
}
/* ---------------------------------------------------------------------
@@ -51,96 +52,98 @@ function client_task() {
* one request at a time but one client can talk to multiple workers at
* once.
*/
-function server_task() {
-
- // Launch pool of worker threads, precise number is not critical
- for($thread_nbr = 0; $thread_nbr < 5; $thread_nbr++) {
- $pid = pcntl_fork();
- if($pid == 0) {
- server_worker();
- exit();
- }
- }
-
- $context = new ZMQContext();
-
- // Frontend socket talks to clients over TCP
- $frontend = new ZMQSocket($context, ZMQ::SOCKET_ROUTER);
- $frontend->bind("tcp://*:5570");
-
- // Backend socket talks to workers over ipc
- $backend = new ZMQSocket($context, ZMQ::SOCKET_DEALER);
- $backend->bind("ipc://backend");
-
- // Connect backend to frontend via a queue device
- // We could do this:
- // $device = new ZMQDevice($frontend, $backend);
- // But doing it ourselves means we can debug this more easily
-
- $read = $write = array();
- // Switch messages between frontend and backend
- while(true) {
- $poll = new ZMQPoll();
- $poll->add($frontend, ZMQ::POLL_IN);
- $poll->add($backend, ZMQ::POLL_IN);
-
- $poll->poll($read, $write);
- foreach($read as $socket) {
- $zmsg = new Zmsg($socket);
- $zmsg->recv();
- if($socket === $frontend) {
- //echo "Request from client:";
- //echo $zmsg->__toString();
- $zmsg->set_socket($backend)->send();
- } else if($socket === $backend) {
- //echo "Request from worker:";
- //echo $zmsg->__toString();
- $zmsg->set_socket($frontend)->send();
- }
- }
- }
+function server_task()
+{
+ // Launch pool of worker threads, precise number is not critical
+ for ($thread_nbr = 0; $thread_nbr < 5; $thread_nbr++) {
+ $pid = pcntl_fork();
+ if ($pid == 0) {
+ server_worker();
+ exit();
+ }
+ }
+
+ $context = new ZMQContext();
+
+ // Frontend socket talks to clients over TCP
+ $frontend = new ZMQSocket($context, ZMQ::SOCKET_ROUTER);
+ $frontend->bind("tcp://*:5570");
+
+ // Backend socket talks to workers over ipc
+ $backend = new ZMQSocket($context, ZMQ::SOCKET_DEALER);
+ $backend->bind("ipc://backend");
+
+ // Connect backend to frontend via a queue device
+ // We could do this:
+ // $device = new ZMQDevice($frontend, $backend);
+ // But doing it ourselves means we can debug this more easily
+
+ $read = $write = array();
+ // Switch messages between frontend and backend
+ while (true) {
+ $poll = new ZMQPoll();
+ $poll->add($frontend, ZMQ::POLL_IN);
+ $poll->add($backend, ZMQ::POLL_IN);
+
+ $poll->poll($read, $write);
+ foreach ($read as $socket) {
+ $zmsg = new Zmsg($socket);
+ $zmsg->recv();
+ if ($socket === $frontend) {
+ //echo "Request from client:";
+ //echo $zmsg->__toString();
+ $zmsg->set_socket($backend)->send();
+ } elseif ($socket === $backend) {
+ //echo "Request from worker:";
+ //echo $zmsg->__toString();
+ $zmsg->set_socket($frontend)->send();
+ }
+ }
+ }
}
-function server_worker() {
- $context = new ZMQContext();
- $worker = new ZMQSocket($context, ZMQ::SOCKET_DEALER);
- $worker->connect("ipc://backend");
- $zmsg = new Zmsg($worker);
-
- while(true) {
- // The DEALER socket gives us the address envelope and message
- $zmsg->recv();
- assert($zmsg->parts() == 2);
-
- // Send 0..4 replies back
- $replies = rand(0,4);
- for($reply = 0; $reply < $replies; $reply++) {
- // Sleep for some fraction of a second
- usleep(rand(0,1000) + 1);
- $zmsg->send(Zmsg::NOCLEAR);
- }
-
- }
+function server_worker()
+{
+ $context = new ZMQContext();
+ $worker = new ZMQSocket($context, ZMQ::SOCKET_DEALER);
+ $worker->connect("ipc://backend");
+ $zmsg = new Zmsg($worker);
+
+ while (true) {
+ // The DEALER socket gives us the address envelope and message
+ $zmsg->recv();
+ assert($zmsg->parts() == 2);
+
+ // Send 0..4 replies back
+ $replies = rand(0,4);
+ for ($reply = 0; $reply < $replies; $reply++) {
+ // Sleep for some fraction of a second
+ usleep(rand(0,1000) + 1);
+ $zmsg->send(Zmsg::NOCLEAR);
+ }
+
+ }
}
/* This main thread simply starts several clients, and a server, and then
* waits for the server to finish.
*/
-function main() {
- for($num_clients = 0; $num_clients < 3; $num_clients++) {
- $pid = pcntl_fork();
- if($pid == 0) {
- client_task();
- exit();
- }
- }
-
- $pid = pcntl_fork();
- if($pid == 0) {
- server_task();
- exit();
- }
-
+function main()
+{
+ for ($num_clients = 0; $num_clients < 3; $num_clients++) {
+ $pid = pcntl_fork();
+ if ($pid == 0) {
+ client_task();
+ exit();
+ }
+ }
+
+ $pid = pcntl_fork();
+ if ($pid == 0) {
+ server_task();
+ exit();
+ }
+
}
main();
88 examples/PHP/flclient1.php
View
@@ -14,32 +14,34 @@
* @param string $endpoint
* @param string $request
*/
-function try_request($ctx, $endpoint, $request) {
- global $request_timeout;
+function try_request($ctx, $endpoint, $request)
+{
+ global $request_timeout;
- printf("I: Trying echo service at %s...\n", $endpoint);
- $client = $ctx->getSocket(ZMQ::SOCKET_REQ);
- $client->connect($endpoint);
- $client->send($request);
+ printf("I: Trying echo service at %s...\n", $endpoint);
+ $client = $ctx->getSocket(ZMQ::SOCKET_REQ);
+ $client->connect($endpoint);
+ $client->send($request);
- $poll = new ZMQPoll();
- $poll->add($client, ZMQ::POLL_IN);
- $readable = $writable = array();
+ $poll = new ZMQPoll();
+ $poll->add($client, ZMQ::POLL_IN);
+ $readable = $writable = array();
- $events = $poll->poll($readable, $writable, $request_timeout);
- $reply = null;
- foreach($readable as $sock) {
- if ($sock == $client) {
- $reply = $client->recvMulti();
- } else {
- $reply = null;
- }
- }
+ $events = $poll->poll($readable, $writable, $request_timeout);
+ $reply = null;
+ foreach ($readable as $sock) {
+ if ($sock == $client) {
+ $reply = $client->recvMulti();
+ } else {
+ $reply = null;
+ }
+ }
- $poll->remove($client);
- $poll = null;
- $client = null;
- return $reply;
+ $poll->remove($client);
+ $poll = null;
+ $client = null;
+
+ return $reply;
}
$context = new ZMQContext();
@@ -49,31 +51,31 @@ function try_request($ctx, $endpoint, $request) {
$cmd = array_shift($argv);
$endpoints = count($argv);
if ($endpoints == 0) {
- printf("I: syntax: %s <endpoint> ...\n", $cmd);
- exit;
+ printf("I: syntax: %s <endpoint> ...\n", $cmd);
+ exit;
}
if ($endpoints == 1) {
- // For one endpoint, we retry N times
- $endpoint = $argv[0];
- for($retries = 0; $retries < $max_retries; $retries++) {
- $reply = try_request($context, $endpoint, $request);
- if (isset($reply)) {
- break; // Success
- }
- printf("W: No response from %s, retrying\n", $endpoint);
- }
+ // For one endpoint, we retry N times
+ $endpoint = $argv[0];
+ for ($retries = 0; $retries < $max_retries; $retries++) {
+ $reply = try_request($context, $endpoint, $request);
+ if (isset($reply)) {
+ break; // Success
+ }
+ printf("W: No response from %s, retrying\n", $endpoint);
+ }
} else {
- // For multiple endpoints, try each at most once
- foreach($argv as $endpoint) {
- $reply = try_request($context, $endpoint, $request);
- if (isset($reply)) {
- break; // Success
- }
- printf("W: No response from %s\n", $endpoint);
- }
+ // For multiple endpoints, try each at most once
+ foreach ($argv as $endpoint) {
+ $reply = try_request($context, $endpoint, $request);
+ if (isset($reply)) {
+ break; // Success
+ }
+ printf("W: No response from %s\n", $endpoint);
+ }
}
if (isset($reply)) {
- print "Service is running OK\n";
-}
+ print "Service is running OK\n";
+}
153 examples/PHP/flclient2.php
View
@@ -6,101 +6,106 @@
* Author: Rob Gagnon <rgagnon24(at)gmail(dot)com>
*/
-class FLClient {
- const GLOBAL_TIMEOUT = 2500; // ms
+class FLClient
+{
+ const GLOBAL_TIMEOUT = 2500; // ms
- private $servers = 0;
- private $sequence = 0;
- /** @var ZMQContext */
- private $context = null;
- /** @var ZMQSocket */
- private $socket = null;
+ private $servers = 0;
+ private $sequence = 0;
+ /** @var ZMQContext */
+ private $context = null;
+ /** @var ZMQSocket */
+ private $socket = null;
- public function __construct() {
- $this->servers = 0;
- $this->sequence = 0;
- $this->context = new ZMQContext();
- $this->socket = $this->context->getSocket(ZMQ::SOCKET_DEALER);
- }
+ public function __construct()
+ {
+ $this->servers = 0;
+ $this->sequence = 0;
+ $this->context = new ZMQContext();
+ $this->socket = $this->context->getSocket(ZMQ::SOCKET_DEALER);
+ }
- public function __destruct() {
- $this->socket->setSockOpt(ZMQ::SOCKOPT_LINGER, 0);
- $this->socket = null;
- $this->context = null;
- }
+ public function __destruct()
+ {
+ $this->socket->setSockOpt(ZMQ::SOCKOPT_LINGER, 0);
+ $this->socket = null;
+ $this->context = null;
+ }
- /**
- * @param string $endpoint
- */
- public function connect($endpoint) {
- $this->socket->connect($endpoint);
- $this->servers++;
- printf("I: Connected to %s\n", $endpoint);
- }
+ /**
+ * @param string $endpoint
+ */
+ public function connect($endpoint)
+ {
+ $this->socket->connect($endpoint);
+ $this->servers++;
+ printf("I: Connected to %s\n", $endpoint);
+ }
- /**
- * @param string $request
- */
- public function request($request) {
- // Prefix request with sequence number and empty envelope
- $this->sequence++;
- $msg = array('', $this->sequence, $request);
+ /**
+ * @param string $request
+ */
+ public function request($request)
+ {
+ // Prefix request with sequence number and empty envelope
+ $this->sequence++;
+ $msg = array('', $this->sequence, $request);
- // Blast the request to all connected servers
- for($server = 1; $server <= $this->servers; $server++) {
- $this->socket->sendMulti($msg);
- }
+ // Blast the request to all connected servers
+ for ($server = 1; $server <= $this->servers; $server++) {
+ $this->socket->sendMulti($msg);
+ }
- // Wait for a matching reply to arrive from anywhere
- // Since we can poll several times, calculate each one
- $poll = new ZMQPoll();
- $poll->add($this->socket, ZMQ::POLL_IN);
+ // Wait for a matching reply to arrive from anywhere
+ // Since we can poll several times, calculate each one
+ $poll = new ZMQPoll();
+ $poll->add($this->socket, ZMQ::POLL_IN);
- $reply = null;
- $endtime = time() + self::GLOBAL_TIMEOUT / 1000;
- while (time() < $endtime) {
- $readable = $writable = array();
- $events = $poll->poll($readable, $writable, ($endtime - time()) * 1000);
- foreach($readable as $sock) {
- if ($sock == $this->socket) {
- $reply = $this->socket->recvMulti();
- if (count($reply) != 3) {
- exit;
- }
- $sequence = $reply[1];
- if ($sequence == $this->sequence) {
- break;
- }
- }
- }
- }
+ $reply = null;
+ $endtime = time() + self::GLOBAL_TIMEOUT / 1000;
+ while (time() < $endtime) {
+ $readable = $writable = array();
+ $events = $poll->poll($readable, $writable, ($endtime - time()) * 1000);
+ foreach ($readable as $sock) {
+ if ($sock == $this->socket) {
+ $reply = $this->socket->recvMulti();
+ if (count($reply) != 3) {
+ exit;
+ }
+ $sequence = $reply[1];
+ if ($sequence == $this->sequence) {
+ break;
+ }
+ }
+ }
+ }
- return $reply;
- }
+ return $reply;
+ }
}
$cmd = array_shift($argv);
if (count($argv) == 0) {
- printf("I: syntax: %s <endpoint> ...\n", $cmd);
- exit;
+ printf("I: syntax: %s <endpoint> ...\n", $cmd);
+ exit;
}
// Create new freelance client object
$client = new FLClient();
-foreach($argv as $endpoint) {
- $client->connect($endpoint);
+foreach ($argv as $endpoint) {
+ $client->connect($endpoint);
}
$start = time();
-for($requests = 0; $requests < 10000; $requests++) {
- $request = "random name";
- $reply = $client->request($request);
- if (!isset($reply)) {
- print "E: name service not available, aborting\n";
- break;
- }
+for ($requests = 0; $requests < 10000; $requests++) {
+ $request = "random name";
+ $reply = $client->request($request);
+ if (!isset($reply)) {
+ print "E: name service not available, aborting\n";
+ break;
+ }
}
printf("Average round trip cost: %i ms\n", ((time() - $start) / 100));
-$client = null;
+$client = null;
12 examples/PHP/flserver1.php
View
@@ -7,8 +7,8 @@
*/
if (count($argv) < 2) {
- printf("I: Syntax: %s <endpoint>\n", $argv[0]);
- exit;
+ printf("I: Syntax: %s <endpoint>\n", $argv[0]);
+ exit;
}
$endpoint = $argv[1];
@@ -17,7 +17,7 @@
$server->bind($endpoint);
printf("I: Echo service is ready at %s\n", $endpoint);
-while(true) {
- $msg = $server->recvMulti();
- $server->sendMulti($msg);
-}
+while (true) {
+ $msg = $server->recvMulti();
+ $server->sendMulti($msg);
+}
24 examples/PHP/flserver2.php
View
@@ -7,8 +7,8 @@
*/
if (count($argv) < 2) {
- printf("I: Syntax: %s <endpoint>\n", $argv[0]);
- exit;
+ printf("I: Syntax: %s <endpoint>\n", $argv[0]);
+ exit;
}
$endpoint = $argv[1];
@@ -17,14 +17,14 @@
$server->bind($endpoint);
printf("I: Echo service is ready at %s\n", $endpoint);
-while(true) {
- $request = $server->recvMulti();
- if (count($request) != 2) {
- // Fail nastily if run against wrong client
- exit(-1);
- }
+while (true) {
+ $request = $server->recvMulti();
+ if (count($request) != 2) {
+ // Fail nastily if run against wrong client
+ exit(-1);
+ }
- $address = $request[0];
- $reply = array($address, 'OK');
- $server->sendMulti($reply);
-}
+ $address = $request[0];
+ $reply = array($address, 'OK');
+ $server->sendMulti($reply);
+}
12 examples/PHP/hwclient.php
View
@@ -13,10 +13,10 @@
$requester = new ZMQSocket($context, ZMQ::SOCKET_REQ);
$requester->connect("tcp://localhost:5555");
-for($request_nbr = 0; $request_nbr != 10; $request_nbr++) {
- printf ("Sending request %d...\n", $request_nbr);
- $requester->send("Hello");
-
- $reply = $requester->recv();
- printf ("Received reply %d: [%s]\n", $request_nbr, $reply);
+for ($request_nbr = 0; $request_nbr != 10; $request_nbr++) {
+ printf ("Sending request %d...\n", $request_nbr);
+ $requester->send("Hello");
+
+ $reply = $requester->recv();
+ printf ("Received reply %d: [%s]\n", $request_nbr, $reply);
}
18 examples/PHP/hwserver.php
View
@@ -12,14 +12,14 @@
$responder = new ZMQSocket($context, ZMQ::SOCKET_REP);
$responder->bind("tcp://*:5555");
-while(true) {
- // Wait for next request from client
- $request = $responder->recv();
- printf ("Received request: [%s]\n", $request);
+while (true) {
+ // Wait for next request from client
+ $request = $responder->recv();
+ printf ("Received request: [%s]\n", $request);
- // Do some 'work'
- sleep (1);
+ // Do some 'work'
+ sleep (1);
- // Send reply back to client
- $responder->send("World");
-}
+ // Send reply back to client
+ $responder->send("World");
+}
2  examples/PHP/identity.php
View
@@ -5,7 +5,7 @@
* zhelpers.h. It gets boring for everyone to keep repeating this code.
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
-include "zhelpers.php";
+include 'zhelpers.php';
$context = new ZMQContext();
266 examples/PHP/lbbroker.php
View
@@ -2,158 +2,162 @@
/*
* Least-recently used (LRU) queue device
* Clients and workers are shown here as IPC as PHP
- * does not have threads.
+ * does not have threads.
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
define("NBR_CLIENTS", 10);
define("NBR_WORKERS", 3);
// Basic request-reply client using REQ socket
-function client_thread() {
- $context = new ZMQContext();
- $client = new ZMQSocket($context, ZMQ::SOCKET_REQ);
- $client->connect("ipc://frontend.ipc");
-
- // Send request, get reply
- $client->send("HELLO");
- $reply = $client->recv();
- printf("Client: %s%s", $reply, PHP_EOL);
+function client_thread()
+{
+ $context = new ZMQContext();
+ $client = new ZMQSocket($context, ZMQ::SOCKET_REQ);
+ $client->connect("ipc://frontend.ipc");
+
+ // Send request, get reply
+ $client->send("HELLO");
+ $reply = $client->recv();
+ printf("Client: %s%s", $reply, PHP_EOL);
}
// Worker using REQ socket to do LRU routing
-function worker_thread () {
- $context = new ZMQContext();
- $worker = $context->getSocket(ZMQ::SOCKET_REQ);
- $worker->connect("ipc://backend.ipc");
+function worker_thread ()
+{
+ $context = new ZMQContext();
+ $worker = $context->getSocket(ZMQ::SOCKET_REQ);
+ $worker->connect("ipc://backend.ipc");
// Tell broker we're ready for work
- $worker->send("READY");
-
- while(true) {
- // Read and save all frames until we get an empty frame
+ $worker->send("READY");
+
+ while (true) {
+ // Read and save all frames until we get an empty frame
// In this example there is only 1 but it could be more
- $address = $worker->recv();
-
- // Additional logic to clean up workers.
- if($address == "END") {
- exit();
- }
- $empty = $worker->recv();
- assert(empty($empty));
-
- // Get request, send reply
- $request = $worker->recv();
- printf ("Worker: %s%s", $request, PHP_EOL);
-
- $worker->send($address, ZMQ::MODE_SNDMORE);
- $worker->send("", ZMQ::MODE_SNDMORE);
- $worker->send("OK");
+ $address = $worker->recv();
+
+ // Additional logic to clean up workers.
+ if ($address == "END") {
+ exit();
+ }
+ $empty = $worker->recv();
+ assert(empty($empty));
+
+ // Get request, send reply
+ $request = $worker->recv();
+ printf ("Worker: %s%s", $request, PHP_EOL);
+
+ $worker->send($address, ZMQ::MODE_SNDMORE);
+ $worker->send("", ZMQ::MODE_SNDMORE);
+ $worker->send("OK");
}
}
-function main() {
- for($client_nbr = 0; $client_nbr < NBR_CLIENTS; $client_nbr++) {
- $pid = pcntl_fork();
- if($pid == 0) {
- client_thread();
- return;
- }
- }
-
- for($worker_nbr = 0; $worker_nbr < NBR_WORKERS; $worker_nbr++) {
- $pid = pcntl_fork();
- if($pid == 0) {
- worker_thread();
- return;
- }
- }
-
- $context = new ZMQContext();
- $frontend = new ZMQSocket($context, ZMQ::SOCKET_ROUTER);
- $backend = new ZMQSocket($context, ZMQ::SOCKET_ROUTER);
- $frontend->bind("ipc://frontend.ipc");
- $backend->bind("ipc://backend.ipc");
-
- // Logic of LRU loop
+function main()
+{
+ for ($client_nbr = 0; $client_nbr < NBR_CLIENTS; $client_nbr++) {
+ $pid = pcntl_fork();
+ if ($pid == 0) {
+ client_thread();
+
+ return;
+ }
+ }
+
+ for ($worker_nbr = 0; $worker_nbr < NBR_WORKERS; $worker_nbr++) {
+ $pid = pcntl_fork();
+ if ($pid == 0) {
+ worker_thread();
+
+ return;
+ }
+ }
+
+ $context = new ZMQContext();
+ $frontend = new ZMQSocket($context, ZMQ::SOCKET_ROUTER);
+ $backend = new ZMQSocket($context, ZMQ::SOCKET_ROUTER);
+ $frontend->bind("ipc://frontend.ipc");
+ $backend->bind("ipc://backend.ipc");
+
+ // Logic of LRU loop
// - Poll backend always, frontend only if 1+ worker ready
// - If worker replies, queue worker as ready and forward reply
// to client if necessary
// - If client requests, pop next worker and send request to it
// Queue of available workers
- $available_workers = 0;
- $worker_queue = array();
- $writeable = $readable = array();
-
- while($client_nbr > 0) {
- $poll = new ZMQPoll();
-
- // Poll front-end only if we have available workers
- if($available_workers > 0) {
- $poll->add($frontend, ZMQ::POLL_IN);
- }
-
- // Always poll for worker activity on backend
- $poll->add($backend, ZMQ::POLL_IN);
- $events = $poll->poll($readable, $writeable);
-
- if($events > 0) {
- foreach($readable as $socket) {
- // Handle worker activity on backend
- if($socket === $backend) {
- // Queue worker address for LRU routing
- $worker_addr = $socket->recv();
- assert($available_workers < NBR_WORKERS);
- $available_workers++;
- array_push($worker_queue, $worker_addr);
-
- // Second frame is empty
- $empty = $socket->recv();
- assert(empty($empty));
-
- // Third frame is READY or else a client reply address
- $client_addr = $socket->recv();
-
- if($client_addr != "READY") {
- $empty = $socket->recv();
- assert(empty($empty));
- $reply = $socket->recv();
- $frontend->send($client_addr, ZMQ::MODE_SNDMORE);
- $frontend->send("", ZMQ::MODE_SNDMORE);
- $frontend->send($reply);
-
- // exit after all messages relayed
- $client_nbr--;
- }
- } else if($socket === $frontend) {
- // Now get next client request, route to LRU worker
- // Client request is [address][empty][request]
- $client_addr = $socket->recv();
- $empty = $socket->recv();
- assert(empty($empty));
- $request = $socket->recv();
-
- $backend->send(array_shift($worker_queue), ZMQ::MODE_SNDMORE);
- $backend->send("", ZMQ::MODE_SNDMORE);
- $backend->send($client_addr, ZMQ::MODE_SNDMORE);
- $backend->send("", ZMQ::MODE_SNDMORE);
- $backend->send($request);
-
- $available_workers--;
- }
- }
- }
- }
-
- // Clean up our worker processes
- foreach($worker_queue as $worker) {
- $backend->send($worker, ZMQ::MODE_SNDMORE);
- $backend->send("", ZMQ::MODE_SNDMORE);
- $backend->send('END');
- }
-
- sleep(1);
+ $available_workers = 0;
+ $worker_queue = array();
+ $writeable = $readable = array();
+
+ while ($client_nbr > 0) {
+ $poll = new ZMQPoll();
+
+ // Poll front-end only if we have available workers
+ if ($available_workers > 0) {
+ $poll->add($frontend, ZMQ::POLL_IN);
+ }
+
+ // Always poll for worker activity on backend
+ $poll->add($backend, ZMQ::POLL_IN);
+ $events = $poll->poll($readable, $writeable);
+
+ if ($events > 0) {
+ foreach ($readable as $socket) {
+ // Handle worker activity on backend
+ if ($socket === $backend) {
+ // Queue worker address for LRU routing
+ $worker_addr = $socket->recv();
+ assert($available_workers < NBR_WORKERS);
+ $available_workers++;
+ array_push($worker_queue, $worker_addr);
+
+ // Second frame is empty
+ $empty = $socket->recv();
+ assert(empty($empty));
+
+ // Third frame is READY or else a client reply address
+ $client_addr = $socket->recv();
+
+ if ($client_addr != "READY") {
+ $empty = $socket->recv();
+ assert(empty($empty));
+ $reply = $socket->recv();
+ $frontend->send($client_addr, ZMQ::MODE_SNDMORE);
+ $frontend->send("", ZMQ::MODE_SNDMORE);
+ $frontend->send($reply);
+
+ // exit after all messages relayed
+ $client_nbr--;
+ }
+ } elseif ($socket === $frontend) {
+ // Now get next client request, route to LRU worker
+ // Client request is [address][empty][request]
+ $client_addr = $socket->recv();
+ $empty = $socket->recv();
+ assert(empty($empty));
+ $request = $socket->recv();
+
+ $backend->send(array_shift($worker_queue), ZMQ::MODE_SNDMORE);
+ $backend->send("", ZMQ::MODE_SNDMORE);
+ $backend->send($client_addr, ZMQ::MODE_SNDMORE);
+ $backend->send("", ZMQ::MODE_SNDMORE);
+ $backend->send($request);
+
+ $available_workers--;
+ }
+ }
+ }
+ }
+
+ // Clean up our worker processes
+ foreach ($worker_queue as $worker) {
+ $backend->send($worker, ZMQ::MODE_SNDMORE);
+ $backend->send("", ZMQ::MODE_SNDMORE);
+ $backend->send('END');
+ }
+
+ sleep(1);
}
main();
-
217 examples/PHP/lbbroker2.php
View
@@ -1,132 +1,137 @@
-<?php
+<?php
/*
* Least-recently used (LRU) queue device
* Demonstrates use of the zmsg class
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
-include "zmsg.php";
+include 'zmsg.php';
define("NBR_CLIENTS", 10);
define("NBR_WORKERS", 3);
// Basic request-reply client using REQ socket
-function client_thread() {
- $context = new ZMQContext();
- $client = new ZMQSocket($context, ZMQ::SOCKET_REQ);
- $client->connect("ipc://frontend.ipc");
-
- // Send request, get reply
- $client->send("HELLO");
- $reply = $client->recv();
- printf("Client: %s%s", $reply, PHP_EOL);
+function client_thread()
+{
+ $context = new ZMQContext();
+ $client = new ZMQSocket($context, ZMQ::SOCKET_REQ);
+ $client->connect("ipc://frontend.ipc");
+
+ // Send request, get reply
+ $client->send("HELLO");
+ $reply = $client->recv();
+ printf("Client: %s%s", $reply, PHP_EOL);
}
// Worker using REQ socket to do LRU routing
-function worker_thread () {
- $context = new ZMQContext();
- $worker = $context->getSocket(ZMQ::SOCKET_REQ);
- $worker->connect("ipc://backend.ipc");
+function worker_thread ()
+{
+ $context = new ZMQContext();
+ $worker = $context->getSocket(ZMQ::SOCKET_REQ);
+ $worker->connect("ipc://backend.ipc");
// Tell broker we're ready for work
- $worker->send("READY");
-
- while(true) {
- $zmsg = new Zmsg($worker);
- $zmsg->recv();
-
- // Additional logic to clean up workers.
- if($zmsg->address() == "END") {
- exit();
- }
-
- printf ("Worker: %s\n", $zmsg->body());
-
- $zmsg->body_set("OK");
- $zmsg->send();
+ $worker->send("READY");
+
+ while (true) {
+ $zmsg = new Zmsg($worker);
+ $zmsg->recv();
+
+ // Additional logic to clean up workers.
+ if ($zmsg->address() == "END") {
+ exit();
+ }
+
+ printf ("Worker: %s\n", $zmsg->body());
+
+ $zmsg->body_set("OK");
+ $zmsg->send();
}
}
-function main() {
- for($client_nbr = 0; $client_nbr < NBR_CLIENTS; $client_nbr++) {
- $pid = pcntl_fork();
- if($pid == 0) {
- client_thread();
- return;
- }
- }
-
- for($worker_nbr = 0; $worker_nbr < NBR_WORKERS; $worker_nbr++) {
- $pid = pcntl_fork();
- if($pid == 0) {
- worker_thread();
- return;
- }
- }
-
- $context = new ZMQContext();
- $frontend = new ZMQSocket($context, ZMQ::SOCKET_ROUTER);
- $backend = new ZMQSocket($context, ZMQ::SOCKET_ROUTER);
- $frontend->bind("ipc://frontend.ipc");
- $backend->bind("ipc://backend.ipc");
-
- // Logic of LRU loop
+function main()
+{
+ for ($client_nbr = 0; $client_nbr < NBR_CLIENTS; $client_nbr++) {
+ $pid = pcntl_fork();
+ if ($pid == 0) {
+ client_thread();
+
+ return;
+ }
+ }
+
+ for ($worker_nbr = 0; $worker_nbr < NBR_WORKERS; $worker_nbr++) {
+ $pid = pcntl_fork();
+ if ($pid == 0) {
+ worker_thread();
+
+ return;
+ }
+ }
+
+ $context = new ZMQContext();
+ $frontend = new ZMQSocket($context, ZMQ::SOCKET_ROUTER);
+ $backend = new ZMQSocket($context, ZMQ::SOCKET_ROUTER);
+ $frontend->bind("ipc://frontend.ipc");
+ $backend->bind("ipc://backend.ipc");
+
+ // Logic of LRU loop
// - Poll backend always, frontend only if 1+ worker ready
// - If worker replies, queue worker as ready and forward reply
// to client if necessary
// - If client requests, pop next worker and send request to it
// Queue of available workers
- $available_workers = 0;
- $worker_queue = array();
- $writeable = $readable = array();
-
- while($client_nbr > 0) {
- $poll = new ZMQPoll();
-
- // Poll front-end only if we have available workers
- if($available_workers > 0) {
- $poll->add($frontend, ZMQ::POLL_IN);
- }
-
- // Always poll for worker activity on backend
- $poll->add($backend, ZMQ::POLL_IN);
- $events = $poll->poll($readable, $writeable);
-
- if($events > 0) {
- foreach($readable as $socket) {
- // Handle worker activity on backend
- if($socket === $backend) {
- // Queue worker address for LRU routing
- $zmsg = new Zmsg($socket);
- $zmsg->recv();
- assert($available_workers < NBR_WORKERS);
- $available_workers++;
- array_push($worker_queue, $zmsg->unwrap());
-
- if($zmsg->body() != "READY") {
- $zmsg->set_socket($frontend)->send();
-
- // exit after all messages relayed
- $client_nbr--;
- }
- } else if($socket === $frontend) {
- $zmsg = new Zmsg($socket);
- $zmsg->recv();
- $zmsg->wrap(array_shift($worker_queue), "");
- $zmsg->set_socket($backend)->send();
- $available_workers--;
- }
- }
- }
- }
-
- // Clean up our worker processes
- foreach($worker_queue as $worker) {
- $zmsg = new Zmsg($backend);
- $zmsg->body_set('END')->wrap($worker, "")->send();
- }
-
- sleep(1);
+ $available_workers = 0;
+ $worker_queue = array();
+ $writeable = $readable = array();
+
+ while ($client_nbr > 0) {
+ $poll = new ZMQPoll();
+
+ // Poll front-end only if we have available workers
+ if ($available_workers > 0) {
+ $poll->add($frontend, ZMQ::POLL_IN);
+ }
+
+ // Always poll for worker activity on backend
+ $poll->add($backend, ZMQ::POLL_IN);
+ $events = $poll->poll($readable, $writeable);
+
+ if ($events > 0) {
+ foreach ($readable as $socket) {
+ // Handle worker activity on backend
+ if ($socket === $backend) {
+ // Queue worker address for LRU routing
+ $zmsg = new Zmsg($socket);
+ $zmsg->recv();
+ assert($available_workers < NBR_WORKERS);
+ $available_workers++;
+ array_push($worker_queue, $zmsg->unwrap());
+
+ if ($zmsg->body() != "READY") {
+ $zmsg->set_socket($frontend)->send();
+
+ // exit after all messages relayed
+ $client_nbr--;
+ }
+ } elseif ($socket === $frontend) {
+ $zmsg = new Zmsg($socket);
+ $zmsg->recv();
+ $zmsg->wrap(array_shift($worker_queue), "");
+ $zmsg->set_socket($backend)->send();
+ $available_workers--;
+ }
+ }
+ }
+ }
+
+ // Clean up our worker processes
+ foreach ($worker_queue as $worker) {
+ $zmsg = new Zmsg($backend);
+ $zmsg->body_set('END')->wrap($worker, "")->send();
+ }
+
+ sleep(1);
}
-main();
+main();
88 examples/PHP/lpclient.php
View
@@ -1,67 +1,69 @@
<?php
-/*
+/*
* Lazy Pirate client
* Use zmq_poll to do a safe request-reply
* To run, start lpserver and then randomly kill/restart it
- *
+ *
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
define("REQUEST_TIMEOUT", 2500); // msecs, (> 1000!)
define("REQUEST_RETRIES", 3); // Before we abandon
-/*
+/*
* Helper function that returns a new configured socket
* connected to the Hello World server
*/
-function client_socket(ZMQContext $context) {
- echo "I: connecting to server...", PHP_EOL;
- $client = new ZMQSocket($context,ZMQ::SOCKET_REQ);
- $client->connect("tcp://localhost:5555");
+function client_socket(ZMQContext $context)
+{
+ echo "I: connecting to server...", PHP_EOL;
+ $client = new ZMQSocket($context,ZMQ::SOCKET_REQ);
+ $client->connect("tcp://localhost:5555");
// Configure socket to not wait at close time
- $client->setSockOpt(ZMQ::SOCKOPT_LINGER, 0);
+ $client->setSockOpt(ZMQ::SOCKOPT_LINGER, 0);
+
return $client;
}
$context = new ZMQContext();
$client = client_socket($context);
-$sequence = 0;
+$sequence = 0;
$retries_left = REQUEST_RETRIES;
$read = $write = array();
-while($retries_left) {
- // We send a request, then we work to get a reply
- $client->send(++$sequence);
-
- $expect_reply = true;
- while($expect_reply) {
- // Poll socket for a reply, with timeout
- $poll = new ZMQPoll();
- $poll->add($client, ZMQ::POLL_IN);
- $events = $poll->poll($read, $write, REQUEST_TIMEOUT);
-
- // If we got a reply, process it
- if($events > 0) {
- // We got a reply from the server, must match sequence
- $reply = $client->recv();
- if(intval($reply) == $sequence) {
- printf ("I: server replied OK (%s)%s", $reply, PHP_EOL);
- $retries_left = REQUEST_RETRIES;
- $expect_reply = false;
- } else {
- printf ("E: malformed reply from server: %s%s", $reply, PHP_EOL);
- }
- } else if(--$retries_left == 0) {
- echo "E: server seems to be offline, abandoning", PHP_EOL;
- break;
- } else {
- echo "W: no response from server, retrying...", PHP_EOL;
- // Old socket will be confused; close it and open a new one
- $client = client_socket($context);
- // Send request again, on new socket
- $client->send($sequence);
- }
- }
-}
+while ($retries_left) {
+ // We send a request, then we work to get a reply
+ $client->send(++$sequence);
+
+ $expect_reply = true;
+ while ($expect_reply) {
+ // Poll socket for a reply, with timeout
+ $poll = new ZMQPoll();
+ $poll->add($client, ZMQ::POLL_IN);
+ $events = $poll->poll($read, $write, REQUEST_TIMEOUT);
+
+ // If we got a reply, process it
+ if ($events > 0) {
+ // We got a reply from the server, must match sequence
+ $reply = $client->recv();
+ if (intval($reply) == $sequence) {
+ printf ("I: server replied OK (%s)%s", $reply, PHP_EOL);
+ $retries_left = REQUEST_RETRIES;
+ $expect_reply = false;
+ } else {
+ printf ("E: malformed reply from server: %s%s", $reply, PHP_EOL);
+ }
+ } elseif (--$retries_left == 0) {
+ echo "E: server seems to be offline, abandoning", PHP_EOL;
+ break;
+ } else {
+ echo "W: no response from server, retrying...", PHP_EOL;
+ // Old socket will be confused; close it and open a new one
+ $client = client_socket($context);
+ // Send request again, on new socket
+ $client->send($sequence);
+ }
+ }
+}
32 examples/PHP/lpserver.php
View
@@ -5,7 +5,7 @@
* Like hwserver except:
* - echoes request as-is
* - randomly runs slowly, or exits to simulate a crash.
- *
+ *
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
@@ -14,19 +14,19 @@
$server->bind("tcp://*:5555");
$cycles = 0;
-while(true) {
- $request = $server->recv();
- $cycles++;
-
- // Simulate various problems, after a few cycles
- if($cycles > 3 && rand(0, 3) == 0) {
- echo "I: simulating a crash", PHP_EOL;
- break;
- } else if($cycles > 3 && rand(0, 3) == 0) {
- echo "I: simulating CPU overload", PHP_EOL;
- sleep(5);
- }
- printf ("I: normal request (%s)%s", $request, PHP_EOL);
+while (true) {
+ $request = $server->recv();
+ $cycles++;
+
+ // Simulate various problems, after a few cycles
+ if ($cycles > 3 && rand(0, 3) == 0) {
+ echo "I: simulating a crash", PHP_EOL;
+ break;
+ } elseif ($cycles > 3 && rand(0, 3) == 0) {
+ echo "I: simulating CPU overload", PHP_EOL;
+ sleep(5);
+ }
+ printf ("I: normal request (%s)%s", $request, PHP_EOL);
sleep(1); // Do some heavy work
- $server->send($request);
-}
+ $server->send($request);
+}
702 examples/PHP/mdbroker.php
View
@@ -1,343 +1,359 @@
-<?php
-/*
- * Majordomo Protocol broker
- * A minimal implementation of http://rfc.zeromq.org/spec:7 and spec:8
- * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
- */
-include_once "zmsg.php";
-include_once "mdp.php";
-
-// We'd normally pull these from config data
-define("HEARTBEAT_LIVENESS", 3); // 3-5 is reasonable
-define("HEARTBEAT_INTERVAL", 2500); // msecs
-define("HEARTBEAT_EXPIRY", HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS);
-
-/* Main broker work happens here */
-$verbose = $_SERVER['argc'] > 1 && $_SERVER['argv'][1] == '-v';
-$broker = new Mdbroker($verbose);
-$broker->bind("tcp://*:5555");
-$broker->listen();
-
-class Mdbroker {
- private $ctx; // Our context
- private $socket; // Socket for clients & workers
- private $endpoint; // Broker binds to this endpoint
-
- private $services = array(); // Hash of known services
- private $workers = array(); // Hash of known workers
- private $waiting = array(); // List of waiting workers
-
- private $verbose = false; // Print activity to stdout
-
- // Heartbeat management
- private $heartbeat_at; // When to send HEARTBEAT
-
- /**
- * Constructor
- *
- * @param boolean $verbose
- */
- public function __construct($verbose = false) {
- $this->ctx = new ZMQContext();
- $this->socket = new ZMQSocket($this->ctx, ZMQ::SOCKET_ROUTER);
- $this->verbose = $verbose;
- $this->heartbeat_at = microtime(true) + (HEARTBEAT_INTERVAL/1000);
- }
-
- /**
- * Bind broker to endpoint, can call this multiple time
- * We use a single socket for both clients and workers.
- *
- * @param string $endpoint
- */
- public function bind($endpoint) {
- $this->socket->bind($endpoint);
- if($this->verbose) {
- printf("I: MDP broker/0.1.1 is active at %s %s", $endpoint, PHP_EOL);
- }
- }
-
- /**
- * This is the main listen and process loop
- */
- public function listen() {
- $read = $write = array();
-
- // Get and process messages forever or until interrupted
- while(true) {
- $poll = new ZMQPoll();
- $poll->add($this->socket, ZMQ::POLL_IN);
-
- $events = $poll->poll($read, $write, HEARTBEAT_INTERVAL);
-
- // Process next input message, if any
- if($events) {
- $zmsg = new Zmsg($this->socket);
- $zmsg->recv();
- if($this->verbose) {
- echo "I: received message:", PHP_EOL, $zmsg->__toString();
- }
-
- $sender = $zmsg->pop();
- $empty = $zmsg->pop();
- $header = $zmsg->pop();
-
- if($header == MDPC_CLIENT) {
- $this->client_process($sender, $zmsg);
- } else if($header == MDPW_WORKER) {
- $this->worker_process($sender, $zmsg);
- } else {
- echo "E: invalid message", PHP_EOL, $zmsg->__toString();
- }
- }
-
- // Disconnect and delete any expired workers
- // Send heartbeats to idle workers if needed
- if(microtime(true) > $this->heartbeat_at) {
- $this->purge_workers();
- foreach($this->workers as $worker) {
- $this->worker_send($worker, MDPW_HEARTBEAT, NULL, NULL);
- }
- $this->heartbeat_at = microtime(true) + (HEARTBEAT_INTERVAL/1000);
- }
- }
- }
-
- /**
- * Delete any idle workers that haven't pinged us in a while.
- * We know that workers are ordered from oldest to most recent.
- */
- public function purge_workers() {
- foreach($this->waiting as $id => $worker) {
- if(microtime(true) < $worker->expiry) {
- break; // Worker is alive, we're done here
- }
- if($this->verbose) {
- printf("I: deleting expired worker: %s %s",
- $worker->identity, PHP_EOL);
- }
- $this->worker_delete($worker);
- }
- }
-
- /**
- * Locate or create new service entry
- *
- * @param string $name
- * @return stdClass
- */
- public function service_require($name) {
- $service = isset($this->services[$name]) ? $this->services[$name] : NULL;
- if($service == NULL) {
- $service = new stdClass();
- $service->name = $name;
- $service->requests = array();
- $service->waiting = array();
- $this->services[$name] = $service;
- }
-
- return $service;
- }
-
- /**
- * Dispatch requests to waiting workers as possible
- *
- * @param type $service
- * @param type $msg
- */
- public function service_dispatch($service, $msg) {
- if($msg) {
- $service->requests[] = $msg;
- }
-
- $this->purge_workers();
-
- while(count($service->waiting) && count($service->requests)) {
- $worker = array_shift($service->waiting);
- $msg = array_shift($service->requests);
- $this->worker_send($worker, MDPW_REQUEST, NULL, $msg);
- }
- }
-
- /**
- * Handle internal service according to 8/MMI specification
- *
- * @param string $frame
- * @param Zmsg $msg
- */
- public function service_internal($frame, $msg) {
- if($frame == "mmi.service") {
- $name = $msg->last();
- $service = $this->services[$name];
- $return_code = $service && $service->workers ? "200" : "404";
- } else {
- $return_code = "501";
- }
-
- $msg->set_last($return_code);
-
- // Remove & save client return envelope and insert the
- // protocol header and service name, then rewrap envelope
- $client = $msg->unwrap();
- $msg->push($frame);
- $msg->push(MDPC_CLIENT);
- $msg->wrap($client, "");
- $msg->set_socket($this->socket)->send();
- }
-
- /**
- * Creates worker if necessary
- *
- * @param string $address
- * @return stdClass
- */
- public function worker_require($address) {
- $worker = isset($this->workers[$address]) ? $this->workers[$address] : NULL;
- if($worker == NULL) {
- $worker = new stdClass();
- $worker->identity = $address;
- $worker->address = $address;
- if($this->verbose) {
- printf("I: registering new worker: %s %s", $address, PHP_EOL);
- }
- $this->workers[$address] = $worker;
- }
- return $worker;
- }
-
- /**
- * Remove a worker
- *
- * @param stdClass $worker
- * @param boolean $disconnect
- */
- public function worker_delete($worker, $disconnect = false) {
- if($disconnect) {
- $this->worker_send($worker, MDPW_DISCONNECT, NULL, NULL);
- }
-
- if(isset($worker->service)) {
- worker_remove_from_array($worker, $worker->service->waiting);
- $worker->service->workers--;
- }
- worker_remove_from_array($worker, $this->waiting);
- unset($this->workers[$worker->identity]);
- }
-
- private function worker_remove_from_array($worker, &$array) {
- $index = array_search($worker, $array);
- if ($index !== false) {
- unset($array[$index]);
- }
- }
-
- /**
- * Process message sent to us by a worker
- *
- * @param string $sender
- * @param Zmsg $msg
- */
- public function worker_process($sender, $msg) {
- $command = $msg->pop();
- $worker_ready = isset($this->workers[$sender]);
- $worker = $this->worker_require($sender);
- if($command == MDPW_READY) {
- if($worker_ready) {
- $this->worker_delete($worker, true); // Not first command in session
- } else if(strlen($sender) >= 4 // Reserved service name
- && substr($sender, 0, 4) == 'mmi.') {
- $this->worker_delete($worker, true);
- } else {
- // Attach worker to service and mark as idle
- $service_frame = $msg->pop();
- $worker->service = $this->service_require($service_frame);
- $worker->service->workers++;
- $this->worker_waiting($worker);
- }
- } else if($command == MDPW_REPLY) {
- if($worker_ready) {
- // Remove & save client return envelope and insert the
- // protocol header and service name, then rewrap envelope.
- $client = $msg->unwrap();
- $msg->push($worker->service->name);
- $msg->push(MDPC_CLIENT);
- $msg->wrap($client, "");
- $msg->set_socket($this->socket)->send();
- $this->worker_waiting($worker);
- } else {
- $this->worker_delete($worker, true);
- }
- } else if($command == MDPW_HEARTBEAT) {
- if($worker_ready) {
- $worker->expiry = microtime(true) + (HEARTBEAT_EXPIRY/1000);
- } else {
- $this->worker_delete($worker, true);
- }
- } else if($command == MDPW_DISCONNECT) {
- $this->worker_delete($worker, true);
- } else {
- echo "E: invalid input message", PHP_EOL, $msg->__toString();
- }
- }
-
- /**
- * Send message to worker
- *
- * @param stdClass $worker
- * @param string $command
- * @param mixed $option
- * @param Zmsg $msg
- */
- public function worker_send($worker, $command, $option, $msg) {
- $msg = $msg ? $msg : new Zmsg();
- // Stack protocol envelope to start of message
- if($option) {
- $msg->push($option);
- }
- $msg->push($command);
- $msg->push(MDPW_WORKER);
-
- // Stack routing envelope to start of message
- $msg->wrap($worker->address, "");
-
- if($this->verbose) {
- printf("I: sending %s to worker %s",
- $command, PHP_EOL);
- echo $msg->__toString();
- }
-
- $msg->set_socket($this->socket)->send();
- }
-
- /**
- * This worker is now waiting for work
- *
- * @param stdClass $worker
- */
- public function worker_waiting($worker) {
- // Queue to broker and service waiting lists
- $this->waiting[] = $worker;
- $worker->service->waiting[] = $worker;
- $worker->expiry = microtime(true) + (HEARTBEAT_EXPIRY/1000);
- $this->service_dispatch($worker->service, NULL);
- }
-
- /**
- * Process a request coming from a client
- *
- * @param string $sender
- * @param Zmsg $msg
- */
- public function client_process($sender, $msg) {
- $service_frame = $msg->pop();
- $service = $this->service_require($service_frame);
-
- // Set reply return address to client sender
- $msg->wrap($sender, "");
- if(substr($service_frame, 0, 4) == 'mmi.') {
- $this->service_internal($service_frame, $msg);
- } else {
- $this->service_dispatch($service, $msg);
- }
- }
-}
+<?php
+/*
+ * Majordomo Protocol broker
+ * A minimal implementation of http://rfc.zeromq.org/spec:7 and spec:8
+ * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
+ */
+include_once 'zmsg.php';
+include_once 'mdp.php';
+
+// We'd normally pull these from config data
+define("HEARTBEAT_LIVENESS", 3); // 3-5 is reasonable
+define("HEARTBEAT_INTERVAL", 2500); // msecs
+define("HEARTBEAT_EXPIRY", HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS);
+
+/* Main broker work happens here */
+$verbose = $_SERVER['argc'] > 1 && $_SERVER['argv'][1] == '-v';
+$broker = new Mdbroker($verbose);
+$broker->bind("tcp://*:5555");
+$broker->listen();
+
+class mdbroker
+{
+ private $ctx; // Our context
+ private $socket; // Socket for clients & workers
+ private $endpoint; // Broker binds to this endpoint
+
+ private $services = array(); // Hash of known services
+ private $workers = array(); // Hash of known workers
+ private $waiting = array(); // List of waiting workers
+
+ private $verbose = false; // Print activity to stdout
+
+ // Heartbeat management
+ private $heartbeat_at; // When to send HEARTBEAT
+
+ /**
+ * Constructor
+ *
+ * @param boolean $verbose
+ */
+ public function __construct($verbose = false)
+ {
+ $this->ctx = new ZMQContext();
+ $this->socket = new ZMQSocket($this->ctx, ZMQ::SOCKET_ROUTER);
+ $this->verbose = $verbose;
+ $this->heartbeat_at = microtime(true) + (HEARTBEAT_INTERVAL/1000);
+ }
+
+ /**
+ * Bind broker to endpoint, can call this multiple time
+ * We use a single socket for both clients and workers.
+ *
+ * @param string $endpoint
+ */
+ public function bind($endpoint)
+ {
+ $this->socket->bind($endpoint);
+ if ($this->verbose) {
+ printf("I: MDP broker/0.1.1 is active at %s %s", $endpoint, PHP_EOL);
+ }
+ }
+
+ /**
+ * This is the main listen and process loop
+ */
+ public function listen()
+ {
+ $read = $write = array();
+
+ // Get and process messages forever or until interrupted
+ while (true) {
+ $poll = new ZMQPoll();
+ $poll->add($this->socket, ZMQ::POLL_IN);
+
+ $events = $poll->poll($read, $write, HEARTBEAT_INTERVAL);
+
+ // Process next input message, if any
+ if ($events) {
+ $zmsg = new Zmsg($this->socket);
+ $zmsg->recv();
+ if ($this->verbose) {
+ echo "I: received message:", PHP_EOL, $zmsg->__toString();
+ }
+
+ $sender = $zmsg->pop();
+ $empty = $zmsg->pop();
+ $header = $zmsg->pop();
+
+ if ($header == MDPC_CLIENT) {
+ $this->client_process($sender, $zmsg);
+ } elseif ($header == MDPW_WORKER) {
+ $this->worker_process($sender, $zmsg);
+ } else {
+ echo "E: invalid message", PHP_EOL, $zmsg->__toString();
+ }
+ }
+
+ // Disconnect and delete any expired workers
+ // Send heartbeats to idle workers if needed
+ if (microtime(true) > $this->heartbeat_at) {
+ $this->purge_workers();
+ foreach ($this->workers as $worker) {
+ $this->worker_send($worker, MDPW_HEARTBEAT, NULL, NULL);
+ }
+ $this->heartbeat_at = microtime(true) + (HEARTBEAT_INTERVAL/1000);
+ }
+ }
+ }
+
+ /**
+ * Delete any idle workers that haven't pinged us in a while.
+ * We know that workers are ordered from oldest to most recent.
+ */
+ public function purge_workers()
+ {
+ foreach ($this->waiting as $id => $worker) {
+ if (microtime(true) < $worker->expiry) {
+ break; // Worker is alive, we're done here
+ }
+ if ($this->verbose) {
+ printf("I: deleting expired worker: %s %s",
+ $worker->identity, PHP_EOL);
+ }
+ $this->worker_delete($worker);
+ }
+ }
+
+ /**
+ * Locate or create new service entry
+ *
+ * @param string $name
+ * @return stdClass
+ */
+ public function service_require($name)
+ {
+ $service = isset($this->services[$name]) ? $this->services[$name] : NULL;
+ if ($service == NULL) {
+ $service = new stdClass();
+ $service->name = $name;
+ $service->requests = array();
+ $service->waiting = array();
+ $this->services[$name] = $service;
+ }
+
+ return $service;
+ }
+
+ /**
+ * Dispatch requests to waiting workers as possible
+ *
+ * @param type $service
+ * @param type $msg
+ */
+ public function service_dispatch($service, $msg)
+ {
+ if ($msg) {
+ $service->requests[] = $msg;
+ }
+
+ $this->purge_workers();
+
+ while (count($service->waiting) && count($service->requests)) {
+ $worker = array_shift($service->waiting);
+ $msg = array_shift($service->requests);
+ $this->worker_send($worker, MDPW_REQUEST, NULL, $msg);
+ }
+ }
+
+ /**
+ * Handle internal service according to 8/MMI specification
+ *
+ * @param string $frame
+ * @param Zmsg $msg
+ */
+ public function service_internal($frame, $msg)
+ {
+ if ($frame == "mmi.service") {
+ $name = $msg->last();
+ $service = $this->services[$name];
+ $return_code = $service && $service->workers ? "200" : "404";
+ } else {
+ $return_code = "501";
+ }
+
+ $msg->set_last($return_code);
+
+ // Remove & save client return envelope and insert the
+ // protocol header and service name, then rewrap envelope
+ $client = $msg->unwrap();
+ $msg->push($frame);
+ $msg->push(MDPC_CLIENT);
+ $msg->wrap($client, "");
+ $msg->set_socket($this->socket)->send();
+ }
+
+ /**
+ * Creates worker if necessary
+ *
+ * @param string $address
+ * @return stdClass
+ */
+ public function worker_require($address)
+ {
+ $worker = isset($this->workers[$address]) ? $this->workers[$address] : NULL;
+ if ($worker == NULL) {
+ $worker = new stdClass();
+ $worker->identity = $address;
+ $worker->address = $address;
+ if ($this->verbose) {
+ printf("I: registering new worker: %s %s", $address, PHP_EOL);
+ }
+ $this->workers[$address] = $worker;
+ }
+
+ return $worker;
+ }
+
+ /**
+ * Remove a worker
+ *
+ * @param stdClass $worker
+ * @param boolean $disconnect
+ */
+ public function worker_delete($worker, $disconnect = false)
+ {
+ if ($disconnect) {
+ $this->worker_send($worker, MDPW_DISCONNECT, NULL, NULL);
+ }
+
+ if (isset($worker->service)) {
+ worker_remove_from_array($worker, $worker->service->waiting);
+ $worker->service->workers--;
+ }
+ worker_remove_from_array($worker, $this->waiting);
+ unset($this->workers[$worker->identity]);
+ }
+
+ private function worker_remove_from_array($worker, &$array)
+ {
+ $index = array_search($worker, $array);
+ if ($index !== false) {
+ unset($array[$index]);
+ }
+ }
+
+ /**
+ * Process message sent to us by a worker
+ *
+ * @param string $sender
+ * @param Zmsg $msg
+ */
+ public function worker_process($sender, $msg)
+ {
+ $command = $msg->pop();
+ $worker_ready = isset($this->workers[$sender]);
+ $worker = $this->worker_require($sender);
+ if ($command == MDPW_READY) {
+ if ($worker_ready) {
+ $this->worker_delete($worker, true); // Not first command in session
+ } else if(strlen($sender) >= 4 // Reserved service name
+ && substr($sender, 0, 4) == 'mmi.') {
+ $this->worker_delete($worker, true);
+ } else {
+ // Attach worker to service and mark as idle
+ $service_frame = $msg->pop();
+ $worker->service = $this->service_require($service_frame);
+ $worker->service->workers++;
+ $this->worker_waiting($worker);
+ }
+ } elseif ($command == MDPW_REPLY) {
+ if ($worker_ready) {
+ // Remove & save client return envelope and insert the
+ // protocol header and service name, then rewrap envelope.
+ $client = $msg->unwrap();
+ $msg->push($worker->service->name);
+ $msg->push(MDPC_CLIENT);
+ $msg->wrap($client, "");
+ $msg->set_socket($this->socket)->send();
+ $this->worker_waiting($worker);
+ } else {
+ $this->worker_delete($worker, true);
+ }
+ } elseif ($command == MDPW_HEARTBEAT) {
+ if ($worker_ready) {
+ $worker->expiry = microtime(true) + (HEARTBEAT_EXPIRY/1000);
+ } else {
+ $this->worker_delete($worker, true);
+ }
+ } elseif ($command == MDPW_DISCONNECT) {
+ $this->worker_delete($worker, true);
+ } else {
+ echo "E: invalid input message", PHP_EOL, $msg->__toString();
+ }
+ }
+
+ /**
+ * Send message to worker
+ *
+ * @param stdClass $worker
+ * @param string $command
+ * @param mixed $option
+ * @param Zmsg $msg
+ */
+ public function worker_send($worker, $command, $option, $msg)
+ {
+ $msg = $msg ? $msg : new Zmsg();
+ // Stack protocol envelope to start of message
+ if ($option) {
+ $msg->push($option);
+ }
+ $msg->push($command);
+ $msg->push(MDPW_WORKER);
+
+ // Stack routing envelope to start of message
+ $msg->wrap($worker->address, "");
+
+ if ($this->verbose) {
+ printf("I: sending %s to worker %s",
+ $command, PHP_EOL);
+ echo $msg->__toString();
+ }
+
+ $msg->set_socket($this->socket)->send();
+ }
+
+ /**
+ * This worker is now waiting for work
+ *
+ * @param stdClass $worker
+ */
+ public function worker_waiting($worker)
+ {
+ // Queue to broker and service waiting lists
+ $this->waiting[] = $worker;
+ $worker->service->waiting[] = $worker;
+ $worker->expiry = microtime(true) + (HEARTBEAT_EXPIRY/1000);
+ $this->service_dispatch($worker->service, NULL);
+ }
+
+ /**
+ * Process a request coming from a client
+ *
+ * @param string $sender
+ * @param Zmsg $msg
+ */
+ public function client_process($sender, $msg)
+ {
+ $service_frame = $msg->pop();
+ $service = $this->service_require($service_frame);
+
+ // Set reply return address to client sender
+ $msg->wrap($sender, "");
+ if (substr($service_frame, 0, 4) == 'mmi.') {
+ $this->service_internal($service_frame, $msg);
+ } else {
+ $this->service_dispatch($service, $msg);
+ }
+ }
+}
272 examples/PHP/mdcliapi.php
View
@@ -1,152 +1,158 @@
<?php
/* =====================================================================
* mdcliapi.h
- *
+ *
* Majordomo Protocol Client API
* Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
* ---------------------------------------------------------------------
* Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
* Copyright other contributors as noted in the AUTHORS file.
- *
+ *
* This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
- *
+ *
* This is free software; you can redistribute it and/or modify it under
- * the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or (at
+ * the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or (at
* your option) any later version.
- *
+ *
* This software is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program. If not, see
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program. If not, see
* <http://www.gnu.org/licenses/>.
* =====================================================================
- *
+ *
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
-include_once "zmsg.php";
-include_once "mdp.php";
+include_once 'zmsg.php';
+include_once 'mdp.php';
-class MDCli {
- // Structure of our class
- // We access these properties only via class methods
- private $broker;
- private $context;
- private $client; // Socket to broker
- private $verbose; // Print activity to stdout
- private $timeout; // Request timeout
- private $retries; // Request retries
-
- /**
- * Constructor
- *
- * @param string $broker
- * @param boolean $verbose
- */
- public function __construct($broker, $verbose = false) {
- $this->broker = $broker;
- $this->context = new ZMQContext();
- $this->verbose = $verbose;
- $this->timeout = 2500; // msecs
- $this->retries = 3; // Before we abandon
- $this->connect_to_broker();
- }
-
- /**
- * Connect or reconnect to broker