/
Producer.php
96 lines (85 loc) · 2.57 KB
/
Producer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
<?php
namespace Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis;
use Kaliop\QueueingBundle\Queue\ProducerInterface;
use Aws\Kinesis\KinesisClient;
class Producer implements ProducerInterface
{
/** @var \Aws\Kinesis\KinesisClient */
protected $client;
protected $streamName;
protected $debug;
/**
* @param array $config - minimum seems to be: 'credentials', 'region', 'version'
* @see \Aws\AwsClient::__construct for the full list
* @see http://docs.aws.amazon.com/aws-sdk-php/v3/guide/guide/configuration.html
*/
public function __construct(array $config)
{
$this->client = new KinesisClient($config);
}
/**
* @param string $queueName
* @todo test that we can successfully send messages to 2 queues using the same KinesisClient
*/
public function setQueueName($queueName)
{
$this->streamName = $queueName;
}
/**
* Note that this has less effect than passing a 'debug' option in constructor, as it will be
* only used by publish() from now on
*
* @param bool $debug use null for 'undefined'
*/
public function setDebug($debug)
{
$this->debug = $debug;
}
/**
* Publishes the message and does nothing with the properties
*
* @param string $msgBody
* @param string $routingKey
* @param array $additionalProperties
*/
public function publish($msgBody, $routingKey = '', $additionalProperties = array())
{
$result = $this->client->putRecord(array_merge(
array(
'StreamName' => $this->streamName,
'Data' => $msgBody,
'PartitionKey' => $routingKey
),
$this->getClientParams()
));
}
/**
* Allows callers to do whatever they want with the client - useful to the Queue Mgr
*
* @param string $method
* @param array $args
* @return mixed
*/
public function call($method, array $args = array())
{
return $this->client->$method(array_merge($args, $this->getClientParams()));
}
/**
* Prepares the extra parameters to be injected into calls made via the Kinesis Client
* @return array
*/
protected function getClientParams()
{
if ($this->debug !== null) {
return array('@http' => array('debug' => $this->debug));
}
return array();
}
/**
* Does nothing - needed to implement the interface correctly
* @param string $contentType
*/
public function setContentType($contentType)
{
}
}