Permalink
Browse files

Initial checkin

  • Loading branch information...
ianbarber committed Feb 12, 2011
0 parents commit 8b87efc7757e57ebd9e11cf3be1bcdb5db817356
1 README
@@ -0,0 +1 @@
+Code from my talk on ZeroMQ for PHP conferences at PHPUK2011, DPC2011 and PHPTEK 2011
@@ -0,0 +1,13 @@
+<?php
+// cache client should mark extended date in memcached or similar
+$context = new ZMQContext();
+$socket = new ZMQSocket($context, ZMQ::SOCKET_PUB);
+$socket->connect("ipc:///tmp/usercache");
+$socket->connect("ipc:///tmp/datacache");
+$type = array('users', 'data');
+
+while(true) {
+ $socket->send($type[array_rand($type)], ZMQ::MODE_SNDMORE);
+ $socket->send(rand(0, 12));
+ sleep(rand(0,3));
+}
@@ -0,0 +1,12 @@
+<?php
+
+$context = new ZMQContext();
+$socket = new ZMQSocket($context, ZMQ::SOCKET_SUB);
+$socket->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "data");
+$socket->bind("ipc:///tmp/datacache");
+
+while(true) {
+ $cache = $socket->recv();
+ $request = $socket->recv();
+ echo "Clearing $cache $request\n";
+}
@@ -0,0 +1,12 @@
+<?php
+
+$context = new ZMQContext();
+$socket = new ZMQSocket($context, ZMQ::SOCKET_SUB);
+$socket->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "users");
+$socket->bind("ipc:///tmp/usercache");
+
+while(true) {
+ $cache = $socket->recv();
+ $request = $socket->recv();
+ echo "Clearing $cache $request\n";
+}
@@ -0,0 +1,19 @@
+<?php
+
+$context = new ZMQContext();
+$sub = new ZMQSocket($context, ZMQ::SOCKET_SUB);
+$sub->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, '');
+$sub->connect('tcp://localhost:5566');
+$poll = new ZMQPoll();
+$poll->add($sub, ZMQ::POLL_IN);
+$readable = $writeable = array();
+while(true) {
+ $events = $poll->poll($readable, $writeable, 5000000);
+ if($events > 0) {
+ echo "\n\n<script type='text/javascript'>parent.updateChat('" . str_replace("'", "\'", $sub->recv()) ."');</script>\n\n";
+ } else {
+ echo ".";
+ }
+ ob_flush();
+ flush();
+}
@@ -0,0 +1,84 @@
+<html>
+ <head>
+ <title>ZeroMQ Chat</title>
+ <style>
+ body {
+ padding: 20px;
+ background-color: #E5E5E5;
+ font-family: Verdana, Arial, Sans-Serif;
+ color: #555;
+ }
+
+ #chat {
+ display: none;
+ }
+ #chatwindow {
+ width: 700px;
+ height: 400px;
+ overflow: scroll;
+ margin: 10px auto;
+ background: white;
+ border: 1px solid black;
+ }
+
+ #chatwindow p {
+ margin: 0px 5px;
+ }
+
+ #chatform {
+ width: 700px;
+ margin: 0px auto;
+ }
+
+ #chatbox {
+ width: 620px;
+ }
+
+ #datasrc {
+ display: none;
+ }
+
+
+ </style>
+ <script lang="text/javascript" src="jquery-1.2.6.pack.js"></script>
+ <script lang="text/javascript">
+ function updateChat(response) {
+ $('#chatwindow').append('<p>' + response + '</p>');
+ }
+
+ $(document).ready(function(){
+ myname = '';
+
+ $("#nameform").submit(function(e) {
+ myname = $('#namebox').val();
+ $('#nameform').css('display', 'none');
+ $('#chat').css('display', 'block');
+ e.stopPropagation();
+ e.preventDefault();
+ $('body').append( '<iframe id="datasrc" src="chat.php?name=' + myname + '"></iframe>');
+ $.post('/chat/send.php', {'name': myname, 'message': 'm:joined'});
+ });
+ $('#chatform').submit(function(e) {
+ $.post('/chat/send.php', {'name': myname, 'message': $('#chatbox').val()});
+ e.stopPropagation();
+ e.preventDefault();
+ $('#chatbox').val('');
+ });
+ });
+ </script>
+ </head>
+ <body>
+ <form id="nameform">
+ <label>Enter Your Name: <input id="namebox" name="namebox" type="textarea" maxlength="30" value="" /></label>
+ <input type="submit" value="Join" />
+ </form>
+ <div id="chat">
+ <div id="chatwindow">
+ </div>
+ <form id="chatform">
+ <input id="chatbox" name="chatbox" type="textarea" value="" />
+ <input type="submit" value="Send" />
+ </form>
+ </div>
+ </body>
+</html>

Large diffs are not rendered by default.

Oops, something went wrong.
@@ -0,0 +1,13 @@
+<?php
+
+$name = htmlspecialchars($_POST['name']);
+$message = htmlspecialchars($_POST['message']);
+
+$context = new ZMQContext();
+$send = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
+$send->connect('tcp://localhost:5567');
+if($message == 'm:joined') {
+ $send->send("<em>" . $name . " has joined</em>");
+} else {
+ $send->send($name . ': ' . $message);
+}
@@ -0,0 +1,13 @@
+<?php
+
+$ctx = new ZMQContext();
+$pub = $ctx->getSocket(ZMQ::SOCKET_PUB);
+$pub->bind('tcp://*:5566');
+$pull = $ctx->getSocket(ZMQ::SOCKET_PULL);
+$pull->bind('tcp://*:5567');
+
+while(true) {
+ $message = $pull->recv();
+ echo "Got ", $message, PHP_EOL;
+ $pub->send($message);
+}
@@ -0,0 +1,21 @@
+simple_handler = Handler(
+ send_spec='tcp://*:9997',
+ send_ident='ab206881-6f49-4276-9db1-1676bfae18b0',
+ recv_spec='tcp://*:9996', recv_ident=''
+)
+
+main = Server(
+ uuid="9e71cabf-6afb-4ee1-b550-7972245f7e0a",
+ access_log="/logs/access.log",
+ error_log="/logs/error.log",
+ chroot="./",
+ default_host="general.local",
+ name="test",
+ pid_file="/run/mongre2.pid",
+ port=6767,
+ hosts = [
+ Host(name="general.local", routes={'/test':simple_handler})
+ ]
+)
+settings = {"zeromq.threads": 1}
+servers = [main]
@@ -0,0 +1,18 @@
+<?php
+
+$ctx = new ZMQContext();
+$in = $ctx->getSocket(ZMQ::SOCKET_PULL);
+$in->connect('tcp://localhost:9997');
+
+$out = $ctx->getSocket(ZMQ::SOCKET_PUB);
+$out->connect('tcp://localhost:9996');
+
+$http = "HTTP/1.1 200 OK\r\nContent-Length: %s\r\n\r\n%s";
+
+while(true) {
+ $msg = $in->recv();
+ list($uuid, $id, $path, $rest) = explode(" ", $msg, 4);
+ $res = $uuid . " " . strlen($id) . ':' . $id . ", ";
+ $res .= sprintf($http, 6, "Hello!");
+ $out->send($res);
+}
@@ -0,0 +1,37 @@
+<?php
+namespace m2php;
+require 'm2conn.php';
+require 'm2req.php';
+require 'm2tools.php';
+$sender_id = "82209006-86FF-4982-B5EA-D1E29E55D481";
+
+$conn = new \m2php\Connection($sender_id, "tcp://127.0.0.1:9997", "tcp://127.0.0.1:9996");
+$ctx = new \ZMQContext();
+$sub = new \ZMQSocket($ctx, \ZMQ::SOCKET_SUB);
+$sub->setSockOpt(\ZMQ::SOCKOPT_SUBSCRIBE, '');
+$sub->connect('tcp://localhost:5566');
+$poll = new \ZMQPoll();
+$poll->add($sub, \ZMQ::POLL_IN);
+$poll->add($conn->reqs, \ZMQ::POLL_IN);
+$read = $write = $ids = array();
+$sender = '';
+$http_head = "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nTransfer-Encoding: chunked\r\n\r\n";
+
+while (true) {
+ $ev = $poll->poll($read, $write);
+ foreach($read as $r) {
+ if($r === $sub) {
+ $msg = "<script type='text/javascript'>parent.updateChat('" . str_replace("'", "\'", $sub->recv()) . "');</script>\r\n";
+ $conn->send($sender, implode(' ', $ids), sprintf("%x\r\n%s", strlen($msg) , $msg));
+ } else {
+ $req = $conn->recv();
+ $sender = $req->sender;
+ if($req->is_disconnect()) {
+ unset($ids[$req->conn_id]);
+ } else {
+ $ids[$req->conn_id] = $req->conn_id;
+ $conn->send($sender, $req->conn_id, $http_head);
+ }
+ }
+ }
+}
@@ -0,0 +1,21 @@
+<?php
+
+$context = new ZMQContext();
+$socket = new ZMQSocket($context, ZMQ::SOCKET_PULL);
+$socket->bind("tcp://*:5555");
+
+$fh = fopen("php://stdin", 'r');
+
+$poll = new ZMQPoll();
+$poll->add($socket, ZMQ::POLL_IN);
+$poll->add($fh, ZMQ::POLL_IN);
+$readable = $writeable = array();
+
+while(true) {
+ $events = $poll->poll($readable, $writeable);
+ if($readable[0] === $socket) {
+ echo "ZMQ: " . $readable[0]->recv();
+ } else {
+ echo "STDIN: " . fgets($readable[0]);
+ }
+}
@@ -0,0 +1,10 @@
+<?php
+
+$context = new ZMQContext();
+$socket = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
+$socket->connect("tcp://localhost:5555");
+
+$fh = fopen("php://stdin", "r");
+while($data = fgets($fh)) {
+ $socket->send($data);
+}
@@ -0,0 +1,31 @@
+<?php
+
+$context = new ZMQContext();
+$frontend = new ZMQSocket($context, ZMQ::SOCKET_XREP);
+$backend = new ZMQSocket($context, ZMQ::SOCKET_XREQ);
+$frontend->bind('tcp://*:5454');
+$backend->bind('tcp://*:5455');
+
+$poll = new ZMQPoll();
+$poll->add($frontend, ZMQ::POLL_IN);
+$poll->add($backend, ZMQ::POLL_IN);
+$readable = $writeable = array();
+
+while(true) {
+ $events = $poll->poll($readable, $writeable);
+ foreach($readable as $socket) {
+ if($socket === $frontend) {
+ do {
+ $message = $frontend->recv();
+ $more = $frontend->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
+ $backend->send($message, $more ? ZMQ::MODE_SNDMORE : null);
+ } while($more);
+ } else if($socket === $backend) {
+ do {
+ $message = $backend->recv();
+ $more = $backend->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
+ $frontend->send($message, $more ? ZMQ::MODE_SNDMORE : null);
+ } while($more);
+ }
+ }
+}
@@ -0,0 +1,10 @@
+<?php
+
+$context = new ZMQContext();
+$server =$context->getSocket(ZMQ::SOCKET_REP);
+$server->connect("tcp://localhost:5455");
+while(true) {
+ $message = $server->recv();
+ echo "Sending $message World", PHP_EOL;
+ $server->send($message . " World");
+}
@@ -0,0 +1,10 @@
+import zmq
+
+context = zmq.Context()
+server = context.socket(zmq.REP)
+server.connect("tcp://localhost:5455")
+
+while True:
+ message = server.recv()
+ print "Sending", message, "World\n"
+ server.send(message + " World")
@@ -0,0 +1,10 @@
+<?php
+
+$ctx = new ZMQContext();
+$req =
+ new ZMQSocket($ctx, ZMQ::SOCKET_REQ);
+$req->connect("tcp://localhost:5454");
+
+$req->send("Hello");
+echo $req->recv();
+echo PHP_EOL;
@@ -0,0 +1,10 @@
+<?php
+
+$context = new ZMQContext();
+$server =$context->getSocket(ZMQ::SOCKET_REP);
+$server->bind("tcp://*:5454");
+while(true) {
+ $message = $server->recv();
+ echo "Sending $message World", PHP_EOL;
+ $server->send($message . " World");
+}
@@ -0,0 +1,10 @@
+<?php
+
+$ctx = new ZMQContext();
+$req =
+ new ZMQSocket($ctx, ZMQ::SOCKET_REQ);
+$req->connect("tcp://localhost:5454");
+
+$req->send("Hello");
+echo $req->recv();
+echo PHP_EOL;
Oops, something went wrong.

0 comments on commit 8b87efc

Please sign in to comment.