Skip to content

Commit

Permalink
First pass at a message producer
Browse files Browse the repository at this point in the history
  • Loading branch information
gggeek committed Sep 1, 2015
1 parent 0ce0ab3 commit d0febe1
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 8 deletions.
19 changes: 13 additions & 6 deletions Adapter/Kinesis/Driver.php
Expand Up @@ -2,18 +2,16 @@

namespace Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis;

use Kaliop\QueueingBundle\Queue\Queue;
use Kaliop\QueueingBundle\Adapter\DriverInterface;
use Symfony\Component\DependencyInjection\ContainerAware;

use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
use Symfony\Component\Config\FileLocator;
use Symfony\Component\DependencyInjection\ContainerBuilder;

class Driver extends ContainerAware implements DriverInterface
{
protected $debug;

public function acceptMessage($message)
{
// @todo
}

/**
Expand All @@ -22,6 +20,7 @@ public function acceptMessage($message)
*/
public function decodeMessage($message)
{
// @todo
}

/**
Expand All @@ -30,7 +29,10 @@ public function decodeMessage($message)
*/
public function getMessageProducer($queueName)
{
return '???'; $this->container->get('old_sound_rabbit_mq.' . $queueName . '_producer');
$producer = $this->container->get('kaliop_queueing.kinesis.message_producer');
$producer->setQueueName($queueName);
$producer->setDebug($this->debug);
return $producer;
}

/**
Expand All @@ -43,4 +45,9 @@ public function getQueueManager($queueName)
$mgr->setQueueName($queueName);
return $mgr;
}

public function setDebug($debug)
{
$this->debug = $debug;
}
}
85 changes: 85 additions & 0 deletions Adapter/Kinesis/Producer.php
@@ -0,0 +1,85 @@
<?php

namespace Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis;

use Kaliop\QueueingBundle\Queue\MessageProducerInterface;
use Aws\Kinesis\KinesisClient;

class Producer implements MessageProducerInterface
{
/** @var \Aws\Kinesis\KinesisClient */
protected $client;
protected $streamName;
protected $debug;

/**
* @param array $config - minimum seems to be: 'credentials', 'region', 'version'
* @see \Aws\AwsClient::__construct for the full list
* @see http://docs.aws.amazon.com/aws-sdk-php/v3/guide/guide/configuration.html
*/
public function __construct(array $config)
{
$this->client = new KinesisClient($config);
}

/**
* @param string $queueName
* @todo test that we can successfully send messages to 2 queues using the same KinesisClient
*/
public function setQueueName($queueName)
{
$this->streamName = $queueName;
}

/**
* Note that this has less effect than passing a 'debug' option in constructor, as it will be
* only used by publish() from now on
*
* @param bool $debug use null for 'undefined'
*/
public function setDebug($debug)
{
$this->debug = $debug;
}

/**
* Publishes the message and does what he wants with the properties
*
* @param string $msgBody
* @param string $routingKey
* @param array $additionalProperties
*/
public function publish($msgBody, $routingKey = '', $additionalProperties = array())
{
//try {
$result = $this->client->putRecord(array_merge(
array(
'StreamName' => $this->streamName,
'Data' => $msgBody,
'PartitionKey' => $routingKey
),
$this->getClientParams()
));
//} catch (\Exception $e) {
// throw new KinesisProxyException($e->getMessage(), $e->getCode(), $e);
//}
//return $result;
}

protected function getClientParams()
{
if ($this->debug !== null) {
return array('@http' => array('debug' => $this->debug));
}

return array();
}

/**
* Does nothing
* @param string $contentType
*/
public function setContentType($contentType)
{
}
}
Expand Up @@ -21,5 +21,6 @@ public function load(array $configs, ContainerBuilder $container)
{
$loader = new Loader\YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
$loader->load('services.yml');
$loader->load('parameters.yml');
}
}
13 changes: 13 additions & 0 deletions Resources/config/parameters.yml
@@ -0,0 +1,13 @@
# to be maybe replaced with proper semantic config?

parameters:
kaliop_queueing_kinesis.default.client_config:
credentials:
key: ""
secret: ""
# @see http://docs.aws.amazon.com/aws-sdk-php/v2/api/class-Aws.Common.Enum.Region.html
region: "us-east-1"
# NB: It is recommended to fix a specific version for production
version: "latest"
# to get even more debug info that is available via command line switches, use:
#debug: true
7 changes: 6 additions & 1 deletion Resources/config/services.yml
@@ -1,6 +1,7 @@
parameters:
kaliop_queueing.driver.kinesis.class: Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis\Driver
kaliop_queueing.kinesis.queue_manager.class: Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis\QueueManager
kaliop_queueing.kinesis.queue_manager.class: Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis\QueueManager
kaliop_queueing.kinesis.message_producer.class: Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis\Producer

services:
kaliop_queueing.driver.kinesis:
Expand All @@ -15,3 +16,7 @@ services:
#parent: kaliop_queueing.message_producer
calls:
- [ setContainer, [ @service_container ] ]

kaliop_queueing.kinesis.message_producer:
class: %kaliop_queueing.kinesis.message_producer.class%
arguments: [ %kaliop_queueing_kinesis.default.client_config% ]
23 changes: 22 additions & 1 deletion readme.md
@@ -1,5 +1,26 @@
# Kaliop Queueing Bundle - Kinesis plugin

Adds support for AWS Kinesis to https://github.com/kaliop-uk/kueueingbundle
Adds support for AWS Kinesis to the Kaliop Queueing Bundle

See: http://aws.amazon.com/kinesis/ and https://github.com/kaliop-uk/kueueingbundle respectively.

It has been given its own bundle because it has higher requirements than the base Queueing Bundle

## Installation

1. Install the bundle.

3. Enable the KaliopQueueingPluginsKinesisBundle bundle in your kernel class registerBundles().

4. Clear all caches if not on a dev environment

5. If you do not have an AWS account, sign up for one at http://aws.amazon.com/
NB: note that there is no free tier for Kinesis. Pricing is described at: http://aws.amazon.com/kinesis/pricing/

## Usage

* Set up configuration according to your AWS account - see parameters.yml in this bundle

* To create a kinesis stream, use the web interface: https://console.aws.amazon.com/kinesis/home

* When running the kaliop_queueing:queuemessage, usage of the -k option to specify a shard is mandatory

0 comments on commit d0febe1

Please sign in to comment.