Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQS: Revise stream element data model #1604

Merged
merged 10 commits into from
May 15, 2019

Conversation

RustedBones
Copy link
Contributor

Pull Request Checklist

Fixes

Fixes #625

Purpose

Improve SQS batch operations

Background Context

After the AWS SDK update, changes in the AWS models can make the API simpler and cleaner:

  • No need to propagate FifoIdentifiers anymore
  • Separation between request metadata (HTTP request info) and content metadata

@ennru ennru changed the title Split responseMetadata from content metadata SQS: Split responseMetadata from content metadata Mar 25, 2019
@ennru
Copy link
Member

ennru commented Mar 25, 2019

Thank you for modernising this with the latest possibilities from the AWS SDK.
As we now want to promise binary compatibility between releases, this PR failed the MiMa check. Do you see a way to keep the RC1 API around while improving this?

(run mimaReportBinaryIssues to get the MiMa report locally)

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've looked closer at your propsal and added the necessary MiMa exclusions, I agree that FifoMessageIdentifiers should go away.
SqsAckFlow.apply could be a non-API-breaking.

sqs/src/main/scala/akka/stream/alpakka/sqs/SqsModel.scala Outdated Show resolved Hide resolved
}
.mapAsync(settings.concurrentRequests) {
case (request, response) if !response.failed().isEmpty =>
.map {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great to remove the unused Future-wrapping.

@ennru
Copy link
Member

ennru commented Apr 1, 2019

This PR requires a rebase to incorporate changes from #1606.

@RustedBones
Copy link
Contributor Author

RustedBones commented Apr 1, 2019

Thanks @ennru.
By going through the PR once more, I realize I got rid of the FifoMessageIdentifiers a bit too early. The goal of this class was to propagate the GroupId and DeduplicationId to downstream in case the user wants to do some special operations depending on those values (like for the message, we propagate it downstream though SQS only gives the md5 as a response).

In order to pass all information to downstream, we can encapsulate the SendMessageRequest into the SqsPublishResult. Then we can safely remove the FifoMessageIdentifiers and this makes it more symmetric like the SqsAckFlow which also contains the MessageAction.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great that you took another look.
I wonder if we wouldn't be able to get rid of the generics altogether if we'd have separate stream element types for the flat and for the grouped flows?

sqs/src/main/scala/akka/stream/alpakka/sqs/SqsModel.scala Outdated Show resolved Hide resolved
sqs/src/main/scala/akka/stream/alpakka/sqs/SqsModel.scala Outdated Show resolved Hide resolved
@ennru ennru mentioned this pull request Apr 3, 2019
@ennru
Copy link
Member

ennru commented Apr 3, 2019

Let's give this the time it needs and release Alpakka 1.0 anyway. I've created #1621 to mark the API as "may change".

@ennru
Copy link
Member

ennru commented Apr 26, 2019

Picking this up again.

To reduce the risk for "ToManyTypeParametersException" I'd like to propose to introduce specific subclasses of SqsPublishResult:

final class SqsPublishSingleResult @InternalApi private[sqs] (
    responseMetadata: SqsResponseMetadata,
    metadata: SendMessageResponse,
    request: SendMessageRequest
) extends SqsPublishResult[SendMessageResponse](responseMetadata, metadata, request)

final class SqsPublishBatchResult @InternalApi private[sqs] (
    responseMetadata: SqsResponseMetadata,
    metadata: SendMessageBatchResultEntry,
    request: SendMessageRequest
) extends SqsPublishResult[SendMessageBatchResultEntry](responseMetadata, metadata, request)

Names can be improved probably, but the API looks simpler with this.

WDYT?

@RustedBones
Copy link
Contributor Author

Are we risking a ToManyTypeParametersException since the constructor visibility is package?

If we do this we should also do the same with SqsAckResult with ChangeMessageVisibilityResponse, ChangeMessageVisibilityBatchResultEntry, DeleteMessageResponse, DeleteMessageBatchResultEntry and SqsAckResult[Nothing] (returned in a case of MessageAction.Ignore). That make all the type definitions a bit tedious IMHO.

@ennru
Copy link
Member

ennru commented Apr 30, 2019

Yes, you're right.

@RustedBones
Copy link
Contributor Author

Thinking about it again, it's probably better to give a sealed abstract class along with the possible implementations for the SQS model

Right now, the output type from the SqsAckFlow.grouped is a SqsAckResult[SdkPojo] which is contains too many potential implementations due to the SdkPojo (not only used for SQS) and would oblige the users to always provide a default case when pattern matching to be exhaustive. Same problem applies with SqsResponse in a smaller scale.

In addition, it would also be nice to provide unapply on the SQS model for the scala users

@ennru
Copy link
Member

ennru commented Apr 30, 2019

Yes, that is what I am looking for. SdkPojo is too broad and we have just two specialisations of it.

@RustedBones
Copy link
Contributor Author

RustedBones commented May 1, 2019

I've tried to get rid of the type parameter from the publish and ack results to give a clean API:

SqsPublishFlow.apply: Flow[SendMessageRequest, PublishResult, NotUsed]
SqsPublishFlow.grouped: Flow[SendMessageRequest, PublishResultEntry, NotUsed]

SqsAckFlow.apply: Flow[MessageAction, SqsAckResult, NotUsed]
SqsAckFlow.grouped: Flow[MessageAction, SqsAckResultEntry, NotUsed]

Ignore message action is now filtered by the ack flows and don't forward any message downstream

In all the SqsResult there is an accessor to the SqsResponseMetadata but this field is not taken in account for the equals and toString as AWS does it on its own models.

There is now no traces of the 'unbounded' SqsResponse and SdkPojo types.

The only drawback is on the SqsAckResult and SqsAckResultEntry which both contains a def result: Any when the user doesn't pattern-match/down cast to the sub-class implementation.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks much better, the types are simple and self-explaining.

I'm a bit sceptic to filtering the Ignore messages, if that's what users need they can use filter. I foresee we'd like to add some pass-through support going forward and would need a message going through for that.

To leave out the field from equals makes sense, but you could add a little note on that.

Source<Message, NotUsed> source = Source.fromIterator(messages::iterator);

CompletionStage<List<SqsAckResult<SqsResponse>>> stage =
CompletionStage<List<SqsAckResult>> stage =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice.

sqs/src/main/scala/akka/stream/alpakka/sqs/SqsModel.scala Outdated Show resolved Hide resolved
@RustedBones
Copy link
Contributor Author

I've added Ignore to the SqsAck.
Instead of using Option for the result and requestMetadata, I return NotUsed and an empty metadata object.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're getting there...

@2m 2m self-assigned this May 15, 2019
Copy link
Member

@2m 2m left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but I might be part of this. I'll squash it locally and merge it.

@ennru ennru added this to the 1.0.1 milestone May 15, 2019
@2m 2m force-pushed the sqs-batch-publish-result branch from a36d7c9 to 8c96122 Compare May 15, 2019 13:32
@ennru ennru changed the title SQS: Split responseMetadata from content metadata SQS: Revise stream element data model May 15, 2019
@ennru ennru merged commit cac1756 into akka:master May 15, 2019
@ennru
Copy link
Member

ennru commented May 15, 2019

Thank you for this long-coming improvement!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

No support for SQS Fifo queue?
3 participants