Permalink
Browse files

Merge branch 'master' of https://github.com/ianbarber/zguide into ian…

…barber-master
  • Loading branch information...
2 parents ac81790 + 2152da1 commit a35f33ad88bc1550933613b9d5d14e16db92bca4 @hintjens hintjens committed Feb 27, 2011
Showing with 220 additions and 24 deletions.
  1. +143 −10 examples/PHP/asyncsrv.php
  2. +1 −1 examples/PHP/hwserver.php
  3. +68 −10 examples/PHP/suisnail.php
  4. +8 −3 examples/PHP/zmsg.php
View
@@ -1,13 +1,146 @@
-No-one has translated the asyncsrv example into PHP yet. Be the first to create
-asyncsrv in PHP and get one free Internet! If you're the author of the PHP
-binding, this is a great way to get people to use 0MQ in PHP.
+<?php
+/*
+ * Asynchronous client-to-server (XREQ to XREP)
+ *
+ * While this example runs in a single process, that is just to make
+ * it easier to start and stop the example. Each task has its own
+ * context and conceptually acts as a separate process.
+ * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
+ */
+include "zmsg.php";
-To submit a new translation email it to zeromq-dev@lists.zeromq.org. Please:
+/* ---------------------------------------------------------------------
+ * This is our client task
+ * It connects to the server, and then sends a request once per second
+ * It collects responses as they arrive, and it prints them out. We will
+ * run several client tasks in parallel, each with a different random ID.
+ */
+function client_task() {
+ $context = new ZMQContext();
+ $client = new ZMQSocket($context, ZMQ::SOCKET_XREQ);
+
+ // Generate printable identity for the client
+ $identity = sprintf ("%04X", rand(0, 0x10000));
+ $client->setSockOpt(ZMQ::SOCKOPT_IDENTITY, $identity);
+ $client->connect("tcp://localhost:5570");
+
+ $read = $write = array();
+ $poll = new ZMQPoll();
+ $poll->add($client, ZMQ::POLL_IN);
+
+ $request_nbr = 0;
+ while(true) {
+ // Tick once per second, pulling in arriving messages
+ for($centitick = 0; $centitick < 100; $centitick++) {
+ $events = $poll->poll($read, $write, 10000);
+ $zmsg = new Zmsg($client);
+ if($events) {
+ $zmsg->recv();
+ printf ("%s: %s%s", $identity, $zmsg->body(), PHP_EOL);
+ }
+ }
+ $zmsg = new Zmsg($client);
+ $zmsg->body_fmt("request #%d", ++$request_nbr)->send();
+ }
+}
-* Stick to identical functionality and naming used in examples so that readers
- can easily compare languages.
-* You MUST place your name as author in the examples so readers can contact you.
-* You MUST state in the email that you license your code under the MIT/X11
- license.
+/* ---------------------------------------------------------------------
+ * This is our server task
+ * It uses the multithreaded server model to deal requests out to a pool
+ * of workers and route replies back to clients. One worker can handle
+ * one request at a time but one client can talk to multiple workers at
+ * once.
+ */
+function server_task() {
+
+ // Launch pool of worker threads, precise number is not critical
+ for($thread_nbr = 0; $thread_nbr < 5; $thread_nbr++) {
+ $pid = pcntl_fork();
+ if($pid == 0) {
+ server_worker();
+ exit();
+ }
+ }
+
+ $context = new ZMQContext();
+
+ // Frontend socket talks to clients over TCP
+ $frontend = new ZMQSocket($context, ZMQ::SOCKET_XREP);
+ $frontend->bind("tcp://*:5570");
+
+ // Backend socket talks to workers over ipc
+ $backend = new ZMQSocket($context, ZMQ::SOCKET_XREQ);
+ $backend->bind("ipc://backend");
+
+ // Connect backend to frontend via a queue device
+ // We could do this:
+ // $device = new ZMQDevice(ZMQ::DEVICE_QUEUE, $frontend, $backend);
+ // But doing it ourselves means we can debug this more easily
+
+ $read = $write = array();
+ // Switch messages between frontend and backend
+ while(true) {
+ $poll = new ZMQPoll();
+ $poll->add($frontend, ZMQ::POLL_IN);
+ $poll->add($backend, ZMQ::POLL_IN);
+
+ $poll->poll($read, $write);
+ foreach($read as $socket) {
+ $zmsg = new Zmsg($socket);
+ $zmsg->recv();
+ if($socket === $frontend) {
+ //echo "Request from client:";
+ //echo $zmsg->__toString();
+ $zmsg->set_socket($backend)->send();
+ } else if($socket === $backend) {
+ //echo "Request from worker:";
+ //echo $zmsg->__toString();
+ $zmsg->set_socket($frontend)->send();
+ }
+ }
+ }
+}
-Subscribe to the email list at http://lists.zeromq.org/mailman/listinfo/zeromq-dev.
+function server_worker() {
+ $context = new ZMQContext();
+ $worker = new ZMQSocket($context, ZMQ::SOCKET_XREQ);
+ $worker->connect("ipc://backend");
+ $zmsg = new Zmsg($worker);
+
+ while(true) {
+ // The XREQ socket gives us the address envelope and message
+ $zmsg->recv();
+ assert($zmsg->parts() == 2);
+
+ // Send 0..4 replies back
+ $replies = rand(0,4);
+ for($reply = 0; $reply < $replies; $reply++) {
+ // Sleep for some fraction of a second
+ usleep(rand(0,1000) + 1);
+ $zmsg->send(Zmsg::NOCLEAR);
+ }
+
+ }
+}
+
+/* This main thread simply starts several clients, and a server, and then
+ * waits for the server to finish.
+ */
+function main() {
+ for($num_clients = 0; $num_clients < 3; $num_clients++) {
+ $pid = pcntl_fork();
+ if($pid == 0) {
+ client_task();
+ exit();
+ }
+ }
+
+ $pid = pcntl_fork();
+ if($pid == 0) {
+ server_task();
+ exit();
+ }
+
+}
+
+main();
@@ -15,7 +15,7 @@
while(true) {
// Wait for next request from client
$request = $responder->recv();
- printf ("Received request: [%s]\n", $request);
+ printf ("Received request: [%s]\n", $request);
// Do some 'work'
sleep (1);
View
@@ -1,13 +1,71 @@
-No-one has translated the suisnail example into PHP yet. Be the first to create
-suisnail in PHP and get one free Internet! If you're the author of the PHP
-binding, this is a great way to get people to use 0MQ in PHP.
+<?php
+/* Suicidal Snail
+ *
+ * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
+*/
-To submit a new translation email it to zeromq-dev@lists.zeromq.org. Please:
+/* ---------------------------------------------------------------------
+ * This is our subscriber
+ * It connects to the publisher and subscribes to everything. It
+ * sleeps for a short time between messages to simulate doing too
+ * much work. If a message is more than 1 second late, it croaks.
+ */
+define("MAX_ALLOWED_DELAY", 100); // msecs
-* Stick to identical functionality and naming used in examples so that readers
- can easily compare languages.
-* You MUST place your name as author in the examples so readers can contact you.
-* You MUST state in the email that you license your code under the MIT/X11
- license.
+function subscriber() {
+ $context = new ZMQContext();
+
+ // Subscribe to everything
+ $subscriber = new ZMQSocket($context, ZMQ::SOCKET_SUB);
+ $subscriber->connect("tcp://localhost:5556");
+ $subscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "");
-Subscribe to the email list at http://lists.zeromq.org/mailman/listinfo/zeromq-dev.
+ // Get and process messages
+ while(true) {
+ $clock = $subscriber->recv();
+ // Suicide snail logic
+ if(microtime(true)*100 - $clock*100 > MAX_ALLOWED_DELAY) {
+ echo "E: subscriber cannot keep up, aborting", PHP_EOL;
+ break;
+ }
+
+ // Work for 1 msec plus some random additional time
+ usleep(1000 + rand(0, 1000));
+ }
+}
+
+
+/* ---------------------------------------------------------------------
+ * This is our server task
+ * It publishes a time-stamped message to its pub socket every 1ms.
+ */
+function publisher() {
+ $context = new ZMQContext();
+
+ // Prepare publisher
+ $publisher = new ZMQSocket($context, ZMQ::SOCKET_PUB);
+ $publisher->bind("tcp://*:5556");
+
+ while(true) {
+ // Send current clock (msecs) to subscribers
+ $publisher->send(microtime(true));
+ usleep(1000); // 1msec wait
+ }
+}
+
+
+/*
+ * This main thread simply starts a client, and a server, and then
+ * waits for the client to croak.
+ */
+$pid = pcntl_fork();
+if($pid == 0) {
+ publisher();
+ exit();
+}
+
+$pid = pcntl_fork();
+if($pid == 0) {
+ subscriber();
+ exit();
+}
View
@@ -31,12 +31,14 @@
*/
class Zmsg {
+ const NOCLEAR = 2;
+
/**
* Store the parts of the message
*
* @var array
*/
- private $_parts;
+ private $_parts = array();
/**
* Socket to send and receive via
@@ -114,7 +116,7 @@ public function recv() {
* @throws Exception if no socket present
* @return Zmsg
*/
- public function send() {
+ public function send($options = 0) {
if(!isset($this->_socket)) {
throw new Exception("No socket supplied");
}
@@ -124,7 +126,10 @@ public function send() {
$mode = $i++ == $count ? null : ZMQ::MODE_SNDMORE;
$this->_socket->send($part, $mode);
}
- unset($this->_parts);
+ if(!($options & self::NOCLEAR)) {
+ unset($this->_parts);
+ $this->_parts = array();
+ }
return $this;
}

0 comments on commit a35f33a

Please sign in to comment.