Skip to content

Commit

Permalink
Merge remote-tracking branch 'remotes/origin/401' into 401
Browse files Browse the repository at this point in the history
Conflicts:
	source/Conference/Conference.Web.Public/Global.asax.cs
	source/Infrastructure/Azure/Infrastructure.Azure/Messaging/EventBus.cs
	source/WorkerRoleCommandProcessor/ConferenceProcessor.cs
  • Loading branch information
kzu committed May 16, 2012
2 parents 3e2e3cd + 4e5ecbe commit d9b7411
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 26 deletions.
6 changes: 3 additions & 3 deletions source/Conference/Conference.Web.Public/Global.asax.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@

namespace Conference.Web.Public
{
using System.Web;
using System.Data.Entity;
using System.Web.Mvc;
using System.Web.Routing;
using Conference.Common.Entity;
using Infrastructure;
using Infrastructure.BlobStorage;
using Infrastructure.Messaging;
using Infrastructure.Serialization;
Expand All @@ -30,13 +32,11 @@ namespace Conference.Web.Public
using Infrastructure.Sql.Messaging;
using Infrastructure.Sql.Messaging.Implementation;
#else
using Infrastructure.Azure;
using System.Web;
using Infrastructure.Azure.Messaging;
using Infrastructure;
#endif

public class MvcApplication : System.Web.HttpApplication
public class MvcApplication : HttpApplication
{
private IUnityContainer container;

Expand Down
2 changes: 1 addition & 1 deletion source/Conference/Conference.Web/Global.asax.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ namespace Conference.Web.Admin
using System.Web.Mvc;
using System.Web.Routing;
using Conference.Common.Entity;
using Infrastructure;
using Infrastructure.Messaging;
using Infrastructure.Serialization;
#if LOCAL
using Infrastructure.Sql.Messaging;
using Infrastructure.Sql.Messaging.Implementation;
#else
using System.Web;
using Infrastructure.Azure;
using Infrastructure.Azure.Messaging;
using Infrastructure;
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public class given_a_topic_and_subscription : given_messaging_settings, IDisposa
{
public given_a_topic_and_subscription()
{
this.Topic = "cqrsjourney-" + Guid.NewGuid().ToString();
this.Subscription = "cqrsjourney-" + Guid.NewGuid().ToString();
this.Topic = "cqrsjourney-test-" + Guid.NewGuid().ToString();
this.Subscription = "test-" + Guid.NewGuid().ToString();

// Creates the topic too.
this.Settings.CreateSubscription(this.Topic, this.Subscription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
namespace Infrastructure.Azure.IntegrationTests.SessionSubscriptionReceiverIntegration
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using Infrastructure.Azure.Messaging;
using Microsoft.ServiceBus.Messaging;
Expand Down Expand Up @@ -49,20 +52,25 @@ public void when_receiver_created_then_ignores_error_on_recreating_subscription(
}
}

public class given_a_receiver : given_messaging_settings
public class given_a_receiver : given_messaging_settings, IDisposable
{
private string Topic;
private string Subscription;

public given_a_receiver()
{
this.Topic = "cqrsjourney-" + Guid.NewGuid().ToString();
this.Subscription = "cqrsjourney-" + Guid.NewGuid().ToString();
this.Topic = "cqrsjourney-test-" + Guid.NewGuid().ToString();
this.Subscription = "test-" + Guid.NewGuid().ToString();

// Creates the topic too.
this.Settings.CreateSubscription(new SubscriptionDescription(this.Topic, this.Subscription) { RequiresSession = true });
}

public void Dispose()
{
this.Settings.TryDeleteTopic(this.Topic);
}

[Fact]
public void when_sending_message_with_session_then_session_receiver_gets_it()
{
Expand All @@ -81,6 +89,7 @@ public void when_sending_message_with_session_then_session_receiver_gets_it()
receiver.MessageReceived += (s, e) =>
{
received = e.Message.GetBody<string>();
e.Message.Complete();
signal.Set();
};

Expand All @@ -94,13 +103,94 @@ public void when_sending_message_with_session_then_session_receiver_gets_it()
}

[Fact]
public void when_starting_twice_then_ignores_second_request()
public void when_sending_message_with_session_then_session_receiver_gets_both_messages_fast()
{
var client = this.Settings.CreateSubscriptionClient(this.Topic, this.Subscription);
var sender = this.Settings.CreateTopicClient(this.Topic);
var signal = new AutoResetEvent(false);
var body1 = Guid.NewGuid().ToString();
var body2 = Guid.NewGuid().ToString();
var stopWatch = new Stopwatch();

var receiver = new SessionSubscriptionReceiver(this.Settings, this.Topic, this.Subscription);

sender.Send(new BrokeredMessage(body1) { SessionId = "foo" });
sender.Send(new BrokeredMessage(body2) { SessionId = "bar" });

var received = new ConcurrentBag<string>();

receiver.MessageReceived += (s, e) =>
{
received.Add(e.Message.GetBody<string>());
e.Message.Complete();
signal.Set();
};

receiver.Start();

signal.WaitOne();
stopWatch.Start();
signal.WaitOne();
stopWatch.Stop();

receiver.Stop();

Assert.Contains(body1, received);
Assert.Contains(body2, received);
Assert.InRange(stopWatch.Elapsed, TimeSpan.Zero, TimeSpan.FromSeconds(2));
}

[Fact]
public void when_sending_message_with_same_session_at_different_times_then_session_receiver_gets_all()
{
var client = this.Settings.CreateSubscriptionClient(this.Topic, this.Subscription);
var sender = this.Settings.CreateTopicClient(this.Topic);
var signal = new AutoResetEvent(false);
var body1 = Guid.NewGuid().ToString();
var body2 = Guid.NewGuid().ToString();
var body3 = Guid.NewGuid().ToString();

var receiver = new SessionSubscriptionReceiver(this.Settings, this.Topic, this.Subscription);
var stopWatch = new Stopwatch();
var received = new ConcurrentBag<string>();

receiver.MessageReceived += (s, e) =>
{
received.Add(e.Message.GetBody<string>());
e.Message.Complete();
signal.Set();
};

receiver.Start();

sender.Send(new BrokeredMessage(body1) { SessionId = "foo" });
stopWatch.Start();
signal.WaitOne();

sender.Send(new BrokeredMessage(body2) { SessionId = "bar" });
signal.WaitOne();

sender.Send(new BrokeredMessage(body3) { SessionId = "foo" });
signal.WaitOne();
stopWatch.Stop();

receiver.Stop();

Assert.Contains(body1, received);
Assert.Contains(body2, received);
Assert.Contains(body3, received);
Assert.InRange(stopWatch.Elapsed, TimeSpan.Zero, TimeSpan.FromSeconds(10));
}

[Fact]
public void when_starting_twice_then_ignores_second_request()
{
var receiver = new SessionSubscriptionReceiver(this.Settings, this.Topic, this.Subscription);

receiver.Start();
receiver.Start();

receiver.Stop();
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,11 @@ private void ProcessNewPartition(CancellationToken cancellationToken)
private static BrokeredMessage BuildMessage(IEventRecord record)
{
string version = record.RowKey.Substring(RowKeyPrefixIndex);
// TODO: should add SessionID to guarantee ordering.
// Receiver must be prepared to accept sessions.
return new BrokeredMessage(new MemoryStream(Encoding.UTF8.GetBytes(record.Payload)), true)
{
MessageId = record.PartitionKey + "_" + version,
//SessionId = record.PartitionKey,
SessionId = record.SourceId,
// TODO: match with how StandardMetadataProvider adds this metadata.
Properties =
{
{ "Version", version },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ private BrokeredMessage BuildMessage(IEvent @event)
stream.Position = 0;

var message = new BrokeredMessage(stream, true);
message.SessionId = @event.SourceId.ToString();

foreach (var pair in this.metadata.GetMetadata(@event))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,12 @@ protected virtual void Dispose(bool disposing)
/// </summary>
private void ReceiveMessages(CancellationToken cancellationToken)
{
MessageSession session = null;

while (!cancellationToken.IsCancellationRequested)
{
MessageSession session;
try
{
session = this.receiveRetryPolicy.ExecuteAction<MessageSession>(() => this.client.AcceptMessageSession(TimeSpan.FromSeconds(10)));
session = this.receiveRetryPolicy.ExecuteAction<MessageSession>(this.DoAcceptMessageSession);
}
catch (Exception e)
{
Expand All @@ -175,19 +174,16 @@ private void ReceiveMessages(CancellationToken cancellationToken)
continue;
}

BrokeredMessage message = null;

while (!cancellationToken.IsCancellationRequested)
{
BrokeredMessage message = null;
try
{
try
{
// NOTE: we don't long-poll more than a few seconds as
// we're already on a background thread and we want to
// allow other threads/processes/machines to potentially
// receive messages too.
message = this.receiveRetryPolicy.ExecuteAction<BrokeredMessage>(() => session.Receive(TimeSpan.FromSeconds(10)));
// Long polling is used when accepting session and not here. If there are no messages left in session we continue.
message = this.receiveRetryPolicy.ExecuteAction(() => session.Receive(TimeSpan.Zero));
}
catch (Exception e)
{
Expand All @@ -213,10 +209,19 @@ private void ReceiveMessages(CancellationToken cancellationToken)
}
}

if (session != null)
{
this.receiveRetryPolicy.ExecuteAction(() => session.Close());
}
this.receiveRetryPolicy.ExecuteAction(() => session.Close());
}
}

private MessageSession DoAcceptMessageSession()
{
try
{
return this.client.AcceptMessageSession(TimeSpan.FromSeconds(45));
}
catch (TimeoutException)
{
return null;
}
}

Expand Down

0 comments on commit d9b7411

Please sign in to comment.