Permalink
Browse files

Adding a new generic batching system for requests and commands. BC: R…

…emoved Guzzle\Http\Plugin\BatchQueuePlugin BC: Removed Guzzle\Service\Command\CommandSet
  • Loading branch information...
1 parent 6ca6a53 commit 6062e4bdadde646c22eaba2f20c0150aad94479d @mtdowling mtdowling committed Jun 6, 2012
Showing with 88 additions and 128 deletions.
  1. +79 −0 BatchRequestTransfer.php
  2. +1 −1 Client.php
  3. +8 −0 ClientInterface.php
  4. +0 −127 Plugin/BatchQueuePlugin.php
View
@@ -0,0 +1,79 @@
+<?php
+
+namespace Guzzle\Http;
+
+use Guzzle\Common\Batch\BatchTransferInterface;
+use Guzzle\Common\Batch\BatchDivisorInterface;
+use Guzzle\Common\Exception\InvalidArgumentException;
+use Guzzle\Http\Message\RequestInterface;
+
+/**
+ * Batch transfer strategy used to efficiently transfer a batch of requests.
+ * This class is to be used with {@see Guzzle\Common\Batch\BatchInterface}
+ */
+class BatchRequestTransfer implements BatchTransferInterface, BatchDivisorInterface
+{
+ /**
+ * @var int Size of each command batch
+ */
+ protected $batchSize;
+
+ /**
+ * Constructor used to specify how large each batch should be
+ *
+ * @param int $batchSize Size of each batch
+ */
+ public function __construct($batchSize)
+ {
+ $this->batchSize = $batchSize;
+ }
+
+ /**
+ * Creates batches of requests by grouping requests by their associated
+ * curl multi object.
+ *
+ * {@inheritdoc}
+ */
+ public function createBatches(\SplQueue $queue)
+ {
+ // Create batches by curl multi object groups
+ $groups = new \SplObjectStorage();
+ foreach ($queue as $item) {
+ if (!$item instanceof RequestInterface) {
+ throw new InvalidArgumentException('All items must implement Guzzle\Http\Message\RequestInterface');
+ }
+ $multi = $item->getClient()->getCurlMulti();
+ if (!$groups->contains($multi)) {
+ $groups->attach($multi, new \ArrayObject(array($item)));
+ } else {
+ $groups[$multi]->append($item);
+ }
+ }
+
+ $batches = array();
+ foreach ($groups as $batch) {
+ $batches = array_merge($batches, array_chunk($groups[$batch]->getArrayCopy(), $this->batchSize));
+ }
+
+ return $batches;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function transfer(array $batch)
+ {
+ if (empty($batch)) {
+ return;
+ }
+
+ $multi = reset($batch)->getClient()->getCurlMulti();
+
+ // Prepare each request for their respective curl multi objects
+ foreach ($batch as $request) {
+ $multi->add($request);
+ }
+
+ $multi->send();
+ }
+}
View
@@ -461,7 +461,7 @@ public function options($uri = null)
public function send($requests)
{
$curlMulti = $this->getCurlMulti();
- $multipleRequests = is_array($requests);
+ $multipleRequests = !($requests instanceof RequestInterface);
if (!$multipleRequests) {
$requests = array($requests);
}
View
@@ -233,6 +233,14 @@ function send($requests);
function setCurlMulti(CurlMultiInterface $curlMulti);
/**
+ * Get the curl multi object to be used internally by the client for
+ * transferring requests.
+ *
+ * @return CurlMultiInterface
+ */
+ function getCurlMulti();
+
+ /**
* Set the request factory to use with the client when creating requests
*
* @param RequestFactoryInterface $factory Request factory
View
@@ -1,127 +0,0 @@
-<?php
-
-namespace Guzzle\Http\Plugin;
-
-use Guzzle\Common\Event;
-use Guzzle\Http\Message\RequestInterface;
-use Symfony\Component\EventDispatcher\EventSubscriberInterface;
-
-/**
- * Queues requests and sends them in parallel when a flush event is recievied.
- * You can call the flush() method on the plugin or emit a 'flush' event from
- * the client on which the plugin is attached.
- *
- * This plugin probably will not work well with plugins that implicitly
- * send requests (ExponentialBackoffPlugin, CachePlugin) or CommandSets.
- */
-class BatchQueuePlugin implements EventSubscriberInterface, \Countable
-{
- private $autoFlushCount;
- private $queue = array();
-
- /**
- * @param int $autoFlushCount Set to >0 to automatically flush
- * the queue when the number of requests is > $autoFlushCount
- */
- public function __construct($autoFlushCount = 0)
- {
- $this->autoFlushCount = $autoFlushCount;
- }
-
- /**
- * {@inheritdoc}
- */
- public static function getSubscribedEvents()
- {
- return array(
- 'client.create_request' => array('onRequestCreate', -255),
- 'request.before_send' => array('onRequestBeforeSend', 255),
- 'flush' => array('flush', -255)
- );
- }
-
- /**
- * Get a count of the requests in queue
- *
- * @return int
- */
- public function count()
- {
- return count($this->queue);
- }
-
- /**
- * Remove a request from the queue
- *
- * @param RequestInterface $request Request to remove
- *
- * @return BatchQueuePlugin
- */
- public function removeRequest(RequestInterface $request)
- {
- $this->queue = array_filter($this->queue, function($r) use ($request) {
- return $r !== $request;
- });
-
- return $this;
- }
-
- /**
- * Add request to the queue
- *
- * @param Event $event
- */
- public function onRequestCreate(Event $event)
- {
- $this->addRequest($event['request']);
- }
-
- /**
- * Add request to the queue
- *
- * @param RequestInterface $request Request to add
- *
- * @return BatchQueuePlugin
- */
- public function addRequest(RequestInterface $request)
- {
- $this->queue[] = $request;
- if ($this->autoFlushCount && count($this->queue) >= $this->autoFlushCount) {
- $this->flush();
- }
-
- return $this;
- }
-
- /**
- * Ensures that queued requests that get sent outside of the context
- * of the batch plugin get removed from the queue
- *
- * @param Event $event
- */
- public function onRequestBeforeSend(Event $event)
- {
- $this->removeRequest($event['request']);
- }
-
- /**
- * Flush the queue
- *
- * @param Event $event
- */
- public function flush()
- {
- $multis = array();
- // Prepare each request for their respective curl multi objects
- while ($request = array_shift($this->queue)) {
- $multi = $request->getClient()->getCurlMulti();
- $multi->add($request);
- if (!in_array($multi, $multis)) {
- $multis[] = $multi;
- }
- }
- foreach ($multis as $multi) {
- $multi->send();
- }
- }
-}

0 comments on commit 6062e4b

Please sign in to comment.