Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS SQS: Improve SqsAckResult and SqsPublishResult consistency #1336

Merged
merged 12 commits into from
Dec 13, 2018

Conversation

RustedBones
Copy link
Contributor

Fixes #1313

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wondered earlier why there just was the message body returned.
Wouldn't it be useful to return the whole message even with SqsPublishResult?

@RustedBones
Copy link
Contributor Author

You are right, I made a mistake deleting the message body from the SqsPublishResult.

By looking closer at the aws-java-sdk-sqs model, I realized that we can directly combine the SendMessageRequest with its SendMessageResult into a Message.

That would change the signature of a SqsPublishFlow to Flow[SendMessageRequest, Message, NotUsed]. SqsPublishResult would not be needed anymore and that makes the API more consistent with the reading side, also providing a Message.

The only difference is that the Message produced by the publish flow will have an undefined receiptHandle, which is ok to me.

@ennru
Copy link
Member

ennru commented Nov 27, 2018

Wouldn't that lose the information from SendMessageResult?

@RustedBones
Copy link
Contributor Author

RustedBones commented Nov 27, 2018

Indeed, the metadata should also be propagated.

Concerning the data itself, if the queue is not FIFO, Message contains all the necessary info.

If the queue is FIFO, there are additional identifiers: See here

The result contains the sequenceNumber which can't be stored in the message, and the request contains the messageDeduplicationId and the messageGroupId.

It was not clear to me if FIFO is supported in the alpakka connector: #625.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, you went from fixing a few things to a vast improvement overall.
With this changes does Fifo work now?

@RustedBones
Copy link
Contributor Author

@ennru I couldn't write integration tests for FIFO because elasticmq FIFO queues seem to behave like normal queues. No sequence id are generated at insert time.

I've tested the code on a real SQS FIFO queue.
This now works with both single inserts and batch mode.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for taking this even further, it really gets ready for Alpakka 1.0 now.

// override implicit val awsSqsClient: AmazonSQSAsync = AmazonSQSAsyncClientBuilder.standard().build()

val future =
//#flow
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be careful not to duplicate the snippet markers (//#flow).
Check the generated docs with docs/paradoxBrowse.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tip. I was using sbt docs/Local/paradox as stated in the docs without success.


import scala.compat.java8.OptionConverters._

object MessageFactory {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great you added this!

@ennru
Copy link
Member

ennru commented Dec 12, 2018

ElasticMQ doesn't support FIFO.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
Did you check for other duplicated snippet markers?

CompletionStage<Done> done =
Source.single(action)
// #ack
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These #acks seem duplicated from line 47/52.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just moved the code down to have the setting tests before the stream tests.

@RustedBones
Copy link
Contributor Author

RustedBones commented Dec 13, 2018

I did. I tried also to get the snippet more uniform between java/scala.

Commit 9d6dd1e fixes #513

I would have put this in a separate PR but refactoring the tests triggered the bug.

@ennru ennru merged commit ed09a29 into akka:master Dec 13, 2018
@ennru
Copy link
Member

ennru commented Dec 13, 2018

Thank you, the SQS connector seems to be quite popular.

@ennru ennru added this to the 1.0-M2 milestone Dec 13, 2018
@RustedBones
Copy link
Contributor Author

Glad I can help :) Thank you for the good guidance

@RustedBones RustedBones deleted the sqs-ack branch March 24, 2019 16:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants