Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add amazon sqs support #85

Merged
merged 11 commits into from
Mar 16, 2017
Merged

Conversation

pwelter34
Copy link
Contributor

No description provided.

using Foundatio.Utility;
using Nito.AsyncEx;

namespace Foundatio.AWS.Queues {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All our other queues are in the Foundatio.Queues namespace

if (!String.IsNullOrEmpty(_queueUrl))
return;

using (await _lock.LockAsync(cancellationToken).ConfigureAwait(false)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have an AnyContext() extension method for ConfigureAwait(false)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this one was still missed :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That one is not supported, it returns something not supported by the extension method

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have an extension method for that, maybe it's missing from the project?

MessageBody = await _serializer.SerializeToStringAsync(data).ConfigureAwait(false),
};

var response = await _client.Value.SendMessageAsync(message).ConfigureAwait(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be retrying this? We retry in half of our current implementations (curious what your thoughts are) https://github.com/exceptionless/Foundatio/blob/master/src/Foundatio.Redis/Queues/RedisQueue.cs#L201

if (!await OnEnqueuingAsync(data).ConfigureAwait(false))
return null;

Interlocked.Increment(ref _enqueuedCount);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our other clients we do this after we publish the message.

}

protected override async Task<IQueueEntry<T>> DequeueImplAsync(CancellationToken cancellationToken) {
var visibilityTimeout = Convert.ToInt32(_workItemTimeout.TotalSeconds);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to do conversions in the constructor instead of every time we dequeue it

try {
response = await _client.Value.ReceiveMessageAsync(request, cancel).ConfigureAwait(false);
}
catch (TaskCanceledException) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@StephenCleary recommended we catch the base OperationCanceledException exception for all cancellations.


protected override void StartWorkingImpl(Func<IQueueEntry<T>, CancellationToken, Task> handler, bool autoComplete, CancellationToken cancellationToken) {
Task.Run(async () => {
while (!cancellationToken.IsCancellationRequested) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use the linked cancellation token here too, otherwise this task might never end (especially in unit tests) when the test is disposed

https://github.com/exceptionless/Foundatio/blob/master/src/Foundatio.Redis/Queues/RedisQueue.cs#L217

while (!cancellationToken.IsCancellationRequested) {
IQueueEntry<T> entry = null;
try {
entry = await DequeueImplAsync(cancellationToken).ConfigureAwait(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We seem to be breaking this apart into multiple try catches and checking the token to save work

https://github.com/exceptionless/Foundatio/blob/master/src/Foundatio.Redis/Queues/RedisQueue.cs#L226
https://github.com/exceptionless/Foundatio/blob/master/src/Foundatio.AzureStorage/Queues/AzureStorageQueue.cs#L185

Is there any chance you could update the redis implementation (first try catch block should be catching OperationCanceledException)?

<ProjectReference Include="..\Foundatio.Tests\Foundatio.Tests.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="log4net" Version="2.0.8" />
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These references are brought into automatically via our tests.props

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you meant to bring in log4net

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aws sdk looks for it via reflection. i get false errors if its missing.

@@ -28,6 +28,10 @@ public abstract class QueueTestBase : TestWithLoggingBase, IDisposable {
return null;
}

protected virtual Task CloseQueue(IQueue<SimpleWorkItem> queue) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if our other azure queues and rabbit queues are being cleaned up, probably not :. I'm trying to think in the tests where we create multiple queue instances if they would need to be cleaned up or not. I almost think they wouldn't as they should all have the same queue name but different instance ids.

// sqs doesn't support already canceled token, change timeout and token for sqs pattern
var waitTimeout = cancellationToken.IsCancellationRequested ? 0 : _readQueueTimeoutSeconds;

var cancel = cancellationToken.IsCancellationRequested
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this being used anywhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

_client.Value.Dispose();

base.Dispose();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The disposed cancellation token isn't being cancelled here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@niemyjski
Copy link
Member

Looks good to me!!! Thanks for the contrib

@ejsmith
Copy link
Contributor

ejsmith commented Mar 16, 2017

I think the tests fail on this because it won't give it access to the secrets for SQS access.

@niemyjski niemyjski changed the base branch from master to release/v5.1.0 March 16, 2017 16:19
@niemyjski niemyjski merged commit 243a1a8 into FoundatioFx:release/v5.1.0 Mar 16, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants