Permalink
Browse files

Merge branch 'master' of https://github.com/nick-jones/jp

  • Loading branch information...
2 parents 26f2c49 + faf873f commit 577ca669b46e665f34178697fb68951be1f9973e @fredemmott committed Jan 3, 2011
@@ -0,0 +1,35 @@
+#!/usr/bin/php
+<?php
+
+require_once('Thrift/Thrift.php');
+require_once('Thrift/protocol/TBinaryProtocol.php');
+require_once('Thrift/transport/TSocket.php');
+require_once('Thrift/transport/TBufferedTransport.php');
+require_once(dirname(__FILE__). '/../../gen-php/jp/JobPool.php');
+
+$socket = new TSocket('localhost', 9090);
+$transport = new TBufferedTransport($socket);
+$protocol = new TBinaryProtocol($transport);
+$pool = new JobPoolClient($protocol);
+$transport->open();
+
+while(true) {
+ try {
+ $job = $pool->acquire('text');
+ echo 'I\'m consuming a ' . $job->message . PHP_EOL;
+ $pool->purge('text', $job->id);
+ echo 'I consumed a ' . $job->message . PHP_EOL;
+ }
+ catch (EmptyPool $e) {
+ echo 'Pool is empty :(' . PHP_EOL;
+ break;
+ }
+ catch (Exception $e) {
+ echo $e;
+ exit;
+ }
+}
+
+$transport->close();
+
+?>
@@ -0,0 +1,27 @@
+#!/usr/bin/php
+<?php
+
+require_once('Thrift/Thrift.php');
+require_once('Thrift/protocol/TBinaryProtocol.php');
+require_once('Thrift/transport/TSocket.php');
+require_once('Thrift/transport/TBufferedTransport.php');
+require_once(dirname(__FILE__). '/../../gen-php/jp/JobPool.php');
+
+try {
+ $socket = new TSocket('localhost', 9090);
+ $transport = new TBufferedTransport($socket);
+ $protocol = new TBinaryProtocol($transport);
+ $pool = new JobPoolClient($protocol);
+
+ $transport->open();
+ echo 'Adding a pie...' . PHP_EOL;
+ $pool->add('text', 'pie');
+ echo 'I added a pie' . PHP_EOL;
+ $transport->close();
+}
+catch (Exception $e) {
+ echo $e;
+ exit;
+}
+
+?>
@@ -0,0 +1,23 @@
+#!/usr/bin/php
+<?php
+
+set_time_limit(0);
+
+$jpRoot = dirname(__FILE__) . '/../../';
+set_include_path(get_include_path() . PATH_SEPARATOR . $jpRoot . 'lib/php' . PATH_SEPARATOR . $jpRoot . 'gen-php/jp');
+require_once('jp/consumer/Json.php');
+require_once('jp/worker/Interface.php');
+
+class JsonQueueWorker implements jp_worker_Interface {
+ /**
+ * @param array $item
+ * @return void
+ */
+ public function processItem($item) {
+ echo 'I consumed: ' . print_r($item, true);
+ return true;
+ }
+}
+
+$processor = new jp_consumer_Json('json', array('poll_interval' => 200), new JsonQueueWorker());
+$processor->run();
@@ -0,0 +1,13 @@
+#!/usr/bin/php
+<?php
+
+$jpRoot = dirname(__FILE__) . '/../../';
+set_include_path(get_include_path() . PATH_SEPARATOR . $jpRoot . 'lib/php' . PATH_SEPARATOR . $jpRoot . 'gen-php/jp');
+require_once('jp/producer/Json.php');
+
+$producer = new jp_producer_Json('json');
+$doc = array('language' => 'php', 'api' => 'simple', 'format' => 'json');
+
+for($i = 0; $i < 100; $i++) {
+ $producer->add($doc);
+}
@@ -0,0 +1,19 @@
+#!/usr/bin/php
+<?php
+
+set_time_limit(0);
+
+$jpRoot = dirname(__FILE__) . '/../../';
+set_include_path(get_include_path() . PATH_SEPARATOR . $jpRoot . 'lib/php' . PATH_SEPARATOR . $jpRoot . 'gen-php/jp');
+require_once('jp/consumer/Text.php');
+require_once('jp/worker/Interface.php');
+
+class TextQueueWorker implements jp_worker_Interface {
+ public function processItem($item) {
+ echo 'I consumed a ' . $item . PHP_EOL;
+ return true;
+ }
+}
+
+$consumer = new jp_consumer_Text('text', array('poll_interval' => 200), new TextQueueWorker());
+$consumer->run();
@@ -0,0 +1,12 @@
+#!/usr/bin/php
+<?php
+
+$jpRoot = dirname(__FILE__) . '/../../';
+set_include_path(get_include_path() . PATH_SEPARATOR . $jpRoot . 'lib/php' . PATH_SEPARATOR . $jpRoot . 'gen-php/jp');
+require_once('jp/producer/Text.php');
+
+$producer = new jp_producer_Text('text');
+
+for($i = 0; $i < 100; $i++) {
+ $producer->add('simple pizza');
+}
@@ -0,0 +1,21 @@
+#!/usr/bin/php
+<?php
+
+set_time_limit(0);
+
+$jpRoot = dirname(__FILE__) . '/../../';
+set_include_path(get_include_path() . PATH_SEPARATOR . $jpRoot . 'lib/php' . PATH_SEPARATOR . $jpRoot . 'gen-php/jp');
+$GLOBALS['THRIFT_ROOT'] = 'Thrift';
+require_once('jp/consumer/Thrift.php');
+require_once('jp/worker/Interface.php');
+require_once(dirname(__FILE__) . '/../gen-php/example/example_types.php');
+
+class ThriftQueueWorker implements jp_worker_Interface {
+ public function processItem($message) {
+ echo 'I consumed: ' . print_r($message, true);
+ return true;
+ }
+}
+
+$processor = new jp_consumer_Thrift('thrift', array('poll_interval' => 200), new ThriftQueueWorker(), 'ExampleData');
+$processor->run();
@@ -0,0 +1,18 @@
+#!/usr/bin/php
+<?php
+
+$jpRoot = dirname(__FILE__) . '/../../';
+set_include_path(get_include_path() . PATH_SEPARATOR . $jpRoot . 'lib/php' . PATH_SEPARATOR . $jpRoot . 'gen-php/jp');
+$GLOBALS['THRIFT_ROOT'] = 'Thrift';
+require_once('jp/producer/Thrift.php');
+require_once(dirname(__FILE__) . '/../gen-php/example/example_types.php');
+
+$producer = new jp_producer_Thrift('thrift');
+$doc = new ExampleData();
+$doc->language = 'php';
+$doc->api = 'simple';
+$doc->format = 'thrift';
+
+for($i = 0; $i < 100; $i++) {
+ $producer->add($doc);
+}
View
@@ -0,0 +1,54 @@
+<?php
+
+/**
+ * @throws InvalidArgumentException
+ * @package jp
+ */
+abstract class jp_Client {
+ /**
+ * @var array
+ */
+ protected $_options = array();
+
+ /**
+ * @var string
+ */
+ protected $_queue;
+
+ /**
+ * @var JobPoolClient|object
+ */
+ protected $_client;
+
+ /**
+ * @throws InvalidArgumentException
+ * @param string $queue
+ * @param array $options
+ * @return void
+ */
+ public function __construct($queue, $options = array()) {
+ if(!isset($options['host'])) $options['host'] = 'localhost';
+ if(!isset($options['port'])) $options['port'] = 9090;
+ if(!is_string($queue)) throw new InvalidArgumentException();
+
+ $this->_queue = $queue;
+ $this->_options = $options;
+
+ if(isset($options['client'])) {
+ $this->_client = $this->_options['client'];
+ }
+ else {
+ require_once 'Thrift/Thrift.php';
+ require_once 'Thrift/protocol/TBinaryProtocol.php';
+ require_once 'Thrift/transport/TSocket.php';
+ require_once 'Thrift/transport/TBufferedTransport.php';
+ require_once 'JobPool.php';
+
+ $socket = new TSocket($this->_options['host'], $this->_options['port']);
+ $transport = new TBufferedTransport($socket);
+ $protocol = new TBinaryProtocol($transport);
+ $this->_client = new JobPoolClient($protocol);
+ $transport->open();
+ }
+ }
+}
@@ -0,0 +1,62 @@
+<?php
+
+require_once 'jp/Client.php';
+
+/**
+ * @package jp.consumer
+ */
+abstract class jp_consumer_Abstract extends jp_Client {
+ /**
+ * @var jp_worker_Interface
+ */
+ protected $_worker;
+
+ /**
+ * @throws InvalidArgumentException
+ * @param string $queue
+ * @param array $options
+ * @param jp_worker_Abstract $worker
+ * @return void
+ */
+ public function __construct($queue, $options = array(), jp_worker_Interface $worker = null) {
+ if(!$worker) throw new InvalidArgumentException('Expecting worker');
+ if(!isset($options['poll_interval'])) $options['poll_interval'] = 1000000;
+ else $options['poll_interval'] = $options['poll_interval'] * 1000;
+ $this->_worker = $worker;
+ parent::__construct($queue, $options);
+ }
+
+ /**
+ * @return int
+ */
+ public function run() {
+ $i = 0;
+
+ try {
+ while(true) {
+ $this->consume();
+ if($this->_options['poll_interval'] > 0) usleep($this->_options['poll_interval']);
+ ++$i;
+ }
+ }
+ catch(EmptyPool $e) { }
+
+ return $i;
+ }
+
+ /**
+ * @return void
+ */
+ protected function consume() {
+ $job = $this->_client->acquire($this->_queue);
+ $result = $this->_worker->processItem($this->translate($job->message));
+ if($result) $this->_client->purge($this->_queue, $job->id);
+ }
+
+ /**
+ * @abstract
+ * @param string $message
+ * @return void
+ */
+ abstract protected function translate($message);
+}
@@ -0,0 +1,16 @@
+<?php
+
+require_once 'jp/consumer/Abstract.php';
+
+/**
+ * @package jp.consumer
+ */
+class jp_consumer_Json extends jp_consumer_Abstract {
+ /**
+ * @param string $message
+ * @return array
+ */
+ protected function translate($message) {
+ return json_decode($message, true);
+ }
+}
@@ -0,0 +1,16 @@
+<?php
+
+require_once 'jp/consumer/Abstract.php';
+
+/**
+ * @package jp.consumer
+ */
+class jp_consumer_Text extends jp_consumer_Abstract {
+ /**
+ * @param string $message
+ * @return string
+ */
+ protected function translate($message) {
+ return $message;
+ }
+}
@@ -0,0 +1,54 @@
+<?php
+
+require_once 'jp/consumer/Abstract.php';
+
+/**
+ * @throws InvalidArgumentException
+ * @package jp.consumer
+ */
+class jp_consumer_Thrift extends jp_consumer_Abstract {
+ /**
+ * @var TMemoryBuffer
+ */
+ protected $_transport;
+
+ /**
+ * @var TBinaryProtocol
+ */
+ protected $_protocol;
+
+ /**
+ * @var string
+ */
+ protected $_className;
+
+ /**
+ * @throws InvalidArgumentException
+ * @param string $queue
+ * @param array $options
+ * @param string $className
+ * @return void
+ */
+ public function __construct($queue, $options = array(), jp_worker_Interface $worker = null, $className = '') {
+ require_once 'Thrift/protocol/TBinaryProtocol.php';
+ require_once 'Thrift/transport/TMemoryBuffer.php';
+ if(empty($className)) throw new InvalidArgumentException();
+ $protocolFactory = new TBinaryProtocolFactory();
+ $this->_transport = new TMemoryBuffer();
+ $this->_protocol = $protocolFactory->getProtocol($this->_transport);
+ $this->_className = $className;
+ parent::__construct($queue, $options, $worker);
+ }
+
+ /**
+ * @param string $message
+ * @return object
+ */
+ protected function translate($message) {
+ $class = new ReflectionClass($this->_className);
+ $object = $class->newInstanceArgs();
+ $this->_transport->write($message);
+ $object->read($this->_protocol);
+ return $object;
+ }
+}
Oops, something went wrong.

0 comments on commit 577ca66

Please sign in to comment.