Skip to content

Commit

Permalink
Add queue mgmt support: list, info, delete
Browse files Browse the repository at this point in the history
  • Loading branch information
gggeek committed Sep 1, 2015
1 parent d0febe1 commit 7198897
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 29 deletions.
41 changes: 26 additions & 15 deletions Adapter/Kinesis/Producer.php
Expand Up @@ -43,29 +43,40 @@ public function setDebug($debug)
}

/**
* Publishes the message and does what he wants with the properties
* Publishes the message and does nothing with the properties
*
* @param string $msgBody
* @param string $routingKey
* @param array $additionalProperties
*/
public function publish($msgBody, $routingKey = '', $additionalProperties = array())
{
//try {
$result = $this->client->putRecord(array_merge(
array(
'StreamName' => $this->streamName,
'Data' => $msgBody,
'PartitionKey' => $routingKey
),
$this->getClientParams()
));
//} catch (\Exception $e) {
// throw new KinesisProxyException($e->getMessage(), $e->getCode(), $e);
//}
//return $result;
$result = $this->client->putRecord(array_merge(
array(
'StreamName' => $this->streamName,
'Data' => $msgBody,
'PartitionKey' => $routingKey
),
$this->getClientParams()
));
}

/**
* Allows callers to do whatever they want with the client - useful to the Queue Mgr
*
* @param string $method
* @param array $args
* @return mixed
*/
public function call($method, array $args = array())
{
return $this->client->$method(array_merge($args, $this->getClientParams()));
}

/**
* Prepares the extra parameters to be injected into calls made via the Kinesis Client
* @return array
*/
protected function getClientParams()
{
if ($this->debug !== null) {
Expand All @@ -76,7 +87,7 @@ protected function getClientParams()
}

/**
* Does nothing
* Does nothing - needed to implement the interface correctly
* @param string $contentType
*/
public function setContentType($contentType)
Expand Down
45 changes: 31 additions & 14 deletions Adapter/Kinesis/QueueManager.php
@@ -1,9 +1,4 @@
<?php
/**
* User: gaetano.giunta
* Date: 19/05/14
* Time: 19.08
*/

namespace Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis;

Expand All @@ -20,10 +15,12 @@

/**
* A class dedicated not really to sending messages to a queue, bur rather to sending control commands
*
* @todo add support for stream creation 5needs a nr. of shards)
*/
class QueueManager /*extends BaseMessageProducer*/ implements ContainerAwareInterface, QueueManagerInterface
class QueueManager implements ContainerAwareInterface, QueueManagerInterface
{
protected $queue;
protected $streamName;
protected $container;

public function setContainer(ContainerInterface $container = null)
Expand All @@ -36,20 +33,17 @@ public function setContainer(ContainerInterface $container = null)
*/
public function setQueueName($queue)
{
$this->queue = $queue;
$this->streamName = $queue;
}

public function listActions()
{
return array(/*'purge', 'delete', 'info', 'list'*/);
return array('info', 'list', 'delete');
}

public function executeAction($action)
{
/*switch ($action) {
case 'purge':
return $this->purgeQueue();
switch ($action) {
case 'delete':
return $this->deleteQueue();

Expand All @@ -61,6 +55,29 @@ public function executeAction($action)

default:
throw new InvalidArgumentException("Action $action not supported");
}*/
}
}

protected function deleteQueue()
{
$result = $this->getProducerService()->call('DeleteStream', array('StreamName' => $this->streamName));
return $result['@metadata'];
}

protected function queueInfo()
{
$result = $this->getProducerService()->call('DescribeStream', array('StreamName' => $this->streamName));
return $result->get('StreamDescription');
}

protected function listQueues()
{
$result = $this->getProducerService()->call('ListStreams');
return $result->get('StreamNames');
}

protected function getProducerService()
{
return $this->container->get('kaliop_queueing.kinesis.message_producer');
}
}

0 comments on commit 7198897

Please sign in to comment.