-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Kinesis Firehose adapter and tests #42
Conversation
Consider using |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments and things to change.
src/Adapter/FirehoseAdapter.php
Outdated
{ | ||
array_walk($messages, function (MessageInterface &$message, $id) { | ||
$metadata = $message->getMetadata(); | ||
$message = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason this has id, body and attributes ala sqs instead of just body?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, I thought that the expected behaviour was to pass the body and metadata through in the message, but release now that's just a specific SQS thing. I've simplified this to just pass through the body as the top-level message.
src/Adapter/FirehoseAdapter.php
Outdated
* @param string $deliveryStreamName | ||
* @param array $options | ||
*/ | ||
public function __construct(FirehoseClient $client, $deliveryStreamName, array $options = []) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
$options
doesn't do anything.
src/Adapter/FirehoseAdapter.php
Outdated
public function enqueue(array $messages) | ||
{ | ||
$failed = []; | ||
$batches = array_chunk($this->createEnqueueEntries($messages), self::BATCHSIZE_SEND); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we probably don't do this currently. But we should allow this field to be overloaded.
an option in the constructor options, or argument or getter/setter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've add this as an option and added an options accessor method.
class FirehoseIntegrationTest extends TestCase | ||
{ | ||
/** @var string */ | ||
private $queueName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The property $queueName
is not used and could be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
public function setUp() | ||
{ | ||
$this->deliveryStreamName = 'delivery_stream_foo'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The property deliveryStreamName
does not exist. Did you maybe forget to declare it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now declared
'Records' => $requestRecords, | ||
]; | ||
|
||
$results = $this->client->putRecordBatch($request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a top level failed result or does it always return a RequestResponses
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-firehose-2015-08-04.html#putrecordbatch
There's always a RequestResponses
key in the result object. There is also a FailedPutCount
element at the top-level in the response, but to know which messages have failed we'll need to inspect the RequestResponses
object anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am just thinking about what happens if the service is down, I assume guzzle would throw a ServerException
on this call. As long as we are happy passing that back to the user, should be fine. Or we could try{}catch{}
and wrap it in a FailedEnqueueException
.
So the user of this can then just handle failed enqueue exceptions and re-queue happily.
Is kinesis firehose a queue? |
The fact that we're having to implement Fair/unfair? |
I'd argue Firehose is a queue application, it's just that it handles consuming messages itself (into S3 or Redshift). In one of our other applications, we would be using it in a functionally equivalent way to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually changing my status:
can you add some tests for failing responses.
Adding an adapter to send messages to a Kinesis Firehose stream. This adapter only supports the
enqueue
function (as that's the only thing you can do to a Kinesis Firehose delivery stream).