Skip to content

Commit

Permalink
Refactor in order to support the kinesis protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
gggeek committed Sep 3, 2015
1 parent 4d7ebd2 commit 0ce8c89
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 9 deletions.
6 changes: 3 additions & 3 deletions Adapter/RabbitMq/Message.php
Expand Up @@ -20,12 +20,12 @@ public function getBody()
}

/** @return string */
public function getContentEncoding()
public function getContentType()
{
return $this->amqpMessage->content_encoding;
//return $this->amqpMessage->content_encoding;
return $this->amqpMessage->get('content_type');
}


/**
* Check whether a property exists in the 'properties' dictionary
* ...to be determined:... or if present - in the 'delivery_info' dictionary.
Expand Down
14 changes: 14 additions & 0 deletions Queue/MessageConsumerInterface.php
@@ -0,0 +1,14 @@
<?php

namespace Kaliop\QueueingBundle\Queue;


interface MessageConsumerInterface
{
/**
* The function called by the 'consume messages' loop
*
* @param mixed $msg
*/
public function receive($msg);
}
3 changes: 2 additions & 1 deletion Queue/MessageInterface.php
Expand Up @@ -6,6 +6,7 @@
* Modeled after the AMQP message:
*
* - body
* - the content-type, allowing to deserialize the body
* - a bag of properties
*/
interface MessageInterface
Expand All @@ -23,7 +24,7 @@ public function getBody();
//public $is_truncated;

/** @return string */
public function getContentEncoding();
public function getContentType();

/** @return array */
// not needed so far
Expand Down
23 changes: 18 additions & 5 deletions Service/MessageConsumer.php
Expand Up @@ -8,13 +8,16 @@
use Kaliop\QueueingBundle\Event\EventsList;
use Kaliop\QueueingBundle\Event\MessageReceivedEvent;
use Kaliop\QueueingBundle\Queue\MessageInterface;
use Kaliop\QueueingBundle\Queue\MessageConsumerInterface;
use Kaliop\QueueingBundle\Adapter\DriverInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

/**
* Base class for message consumers
* Base class for message consumers.
* It can consume messages of different types, by letting the registered drivers decode them.
* The only method that subclasses need to implement is consume().
*/
abstract class MessageConsumer implements ConsumerInterface
abstract class MessageConsumer implements ConsumerInterface, MessageConsumerInterface
{
protected $assumedContentType = null;
// NB: if you change this value in subclasses, take care about the security implications
Expand Down Expand Up @@ -93,6 +96,17 @@ protected function setAssumedContentType($type)
* @return mixed false to reject and requeue, any other value to acknowledge
*/
public function execute(AMQPMessage $msg)
{
$this->receive($msg);
}

/**
* This is the main entry point, called by driver-specific consumers
*
* @param mixed $msg
* @throws \Exception
*/
public function receive($msg)
{
$this->decodeAndConsume($this->getDriver($msg)->decodeMessage($msg));
}
Expand Down Expand Up @@ -154,9 +168,8 @@ protected function decodeAndConsume(MessageInterface $msg)
*/
protected function decodeMessageBody(MessageInterface $msg)
{
$properties = $msg->getProperties();
// do we accept this type? (nb: this is an optional property)
$type = @$properties['content_type'];
$type = $msg->getContentType();
if ($type == '' && $this->assumedContentType != '') {
$type = $this->assumedContentType;
}
Expand All @@ -165,7 +178,7 @@ protected function decodeMessageBody(MessageInterface $msg)
}

// then decode it
switch ($properties['content_type']) {
switch ($type) {
case 'application/json':
$data = json_decode($msg->getBody(), true);
if ($error = json_last_error()) {
Expand Down

0 comments on commit 0ce8c89

Please sign in to comment.