Skip to content

Commit

Permalink
WIP: add message consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
gggeek committed Sep 1, 2015
1 parent 1d20f58 commit 8e85df5
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 5 deletions.
35 changes: 35 additions & 0 deletions Adapter/Kinesis/Consumer.php
@@ -0,0 +1,35 @@
<?php

namespace Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis;


class Consumer
{
protected $shard;
protected $streamName;

/**
* Does nothing
*/
public function setMemoryLimit()
{
}

public function setRoutingKey($key)
{
$this->shard = $key;
}

public function consume($amount)
{
// @todo
}

/**
* @param string $queueName
*/
public function setQueueName($queueName)
{
$this->streamName = $queueName;
}
}
11 changes: 9 additions & 2 deletions Adapter/Kinesis/Driver.php
Expand Up @@ -9,6 +9,13 @@ class Driver extends ContainerAware implements DriverInterface
{
protected $debug;

public function getConsumer($queueName)
{
$consumer = $this->container->get('kaliop_queueing.kinesis.message_consumer');
$consumer->setQueueName($queueName);
return $consumer;
}

public function acceptMessage($message)
{
// @todo
Expand All @@ -25,9 +32,9 @@ public function decodeMessage($message)

/**
* @param string $queueName
* @return \Kaliop\QueueingBundle\Queue\MessageProducerInterface
* @return \Kaliop\QueueingBundle\Queue\ProducerInterface
*/
public function getMessageProducer($queueName)
public function getProducer($queueName)
{
$producer = $this->container->get('kaliop_queueing.kinesis.message_producer');
$producer->setQueueName($queueName);
Expand Down
4 changes: 2 additions & 2 deletions Adapter/Kinesis/Producer.php
Expand Up @@ -2,10 +2,10 @@

namespace Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis;

use Kaliop\QueueingBundle\Queue\MessageProducerInterface;
use Kaliop\QueueingBundle\Queue\ProducerInterface;
use Aws\Kinesis\KinesisClient;

class Producer implements MessageProducerInterface
class Producer implements ProducerInterface
{
/** @var \Aws\Kinesis\KinesisClient */
protected $client;
Expand Down
7 changes: 6 additions & 1 deletion Resources/config/services.yml
Expand Up @@ -2,6 +2,7 @@ parameters:
kaliop_queueing.driver.kinesis.class: Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis\Driver
kaliop_queueing.kinesis.queue_manager.class: Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis\QueueManager
kaliop_queueing.kinesis.message_producer.class: Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis\Producer
kaliop_queueing.kinesis.message_consumer.class: Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis\Consumer

services:
kaliop_queueing.driver.kinesis:
Expand All @@ -19,4 +20,8 @@ services:

kaliop_queueing.kinesis.message_producer:
class: %kaliop_queueing.kinesis.message_producer.class%
arguments: [ %kaliop_queueing_kinesis.default.client_config% ]
arguments: [ %kaliop_queueing_kinesis.default.client_config% ]

kaliop_queueing.kinesis.message_consumer:
class: %kaliop_queueing.kinesis.message_consumer.class%
#arguments: [ %kaliop_queueing_kinesis.default.client_config% ]

0 comments on commit 8e85df5

Please sign in to comment.