Skip to content

Commit

Permalink
Merge pull request #1429 from jason-bragg/RemoveClientProducerFromPubSub
Browse files Browse the repository at this point in the history
Remove dead client producer from pub sub
  • Loading branch information
Gabriel Kliot committed Apr 5, 2016
2 parents f399eac + b869d0c commit ec071b9
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 65 deletions.
3 changes: 2 additions & 1 deletion src/Orleans/Messaging/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -542,12 +542,13 @@ public Message CreateResponseMessage()
return response;
}

public Message CreateRejectionResponse(RejectionTypes type, string info)
public Message CreateRejectionResponse(RejectionTypes type, string info, OrleansException ex = null)
{
var response = CreateResponseMessage();
response.Result = ResponseTypes.Rejection;
response.RejectionType = type;
response.RejectionInfo = info;
response.BodyObject = ex;
if (logger.IsVerbose) logger.Verbose("Creating {0} rejection with info '{1}' for {2} at:" + Environment.NewLine + "{3}", type, info, this, new System.Diagnostics.StackTrace(true));
return response;
}
Expand Down
10 changes: 7 additions & 3 deletions src/Orleans/Runtime/GrainReference.cs
Original file line number Diff line number Diff line change
Expand Up @@ -404,11 +404,15 @@ private static void ResponseCallback(Message message, TaskCompletionSource<objec
return; // Ignore duplicates

default:
if (String.IsNullOrEmpty(message.RejectionInfo))
rejection = message.BodyObject as OrleansException;
if (rejection == null)
{
message.RejectionInfo = "Unable to send request - no rejection info available";
if (string.IsNullOrEmpty(message.RejectionInfo))
{
message.RejectionInfo = "Unable to send request - no rejection info available";
}
rejection = new OrleansException(message.RejectionInfo);
}
rejection = new OrleansException(message.RejectionInfo);
break;
}
response = Response.ExceptionResponse(rejection);
Expand Down
2 changes: 1 addition & 1 deletion src/OrleansRuntime/Core/Dispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public void ReceiveMessage(Message message)
{
var str = String.Format("{0} {1}", rejectInfo ?? "", exc == null ? "" : exc.ToString());
MessagingStatisticsGroup.OnRejectedMessage(message);
Message rejection = message.CreateRejectionResponse(rejectType, str);
Message rejection = message.CreateRejectionResponse(rejectType, str, exc as OrleansException);
SendRejectionMessage(rejection);
}
else
Expand Down
121 changes: 62 additions & 59 deletions src/OrleansRuntime/Streams/PubSub/PubSubRendezvousGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,73 +160,76 @@ public async Task UnregisterProducer(StreamId streamId, IStreamProducerExtension
await WriteStateAsync();

int numProducers = State.Producers.Count;
if (numProducers > 0)
{
if (logger.IsVerbose)
logger.Info("Notifying {0} existing producer(s) about new consumer {1}. Producers={2}",
numProducers, streamConsumer, Utils.EnumerableToString(State.Producers));
if (numProducers <= 0)
return;

if (logger.IsVerbose)
logger.Info("Notifying {0} existing producer(s) about new consumer {1}. Producers={2}",
numProducers, streamConsumer, Utils.EnumerableToString(State.Producers));

// Notify producers about a new streamConsumer.
var tasks = new List<Task>();
var producers = State.Producers.ToList();
bool someProducersRemoved = false;
// Notify producers about a new streamConsumer.
var tasks = new List<Task>();
var producers = State.Producers.ToList();
int initialProducerCount = producers.Count;
foreach (var producerState in producers)
{
PubSubPublisherState producer = producerState; // Capture loop variable

foreach (var producerState in producers)
if (!IsActiveProducer(producer.Producer))
{
PubSubPublisherState producer = producerState; // Capture loop variable

if (!IsActiveProducer(producer.Producer))
{
// Producer is not active (could be stopping / shutting down) so skip
if (logger.IsVerbose) logger.Verbose("Producer {0} on stream {1} is not active - skipping.", producer, streamId);
continue;
}

Task addSubscriberPromise = producer.Producer.AddSubscriber(subscriptionId, streamId, streamConsumer, filter)
.ContinueWith(t =>
{
if (t.IsFaulted)
{
var exc = t.Exception.GetBaseException();
if (exc is GrainExtensionNotInstalledException)
{
logger.Warn((int) ErrorCode.Stream_ProducerIsDead,
"Producer {0} on stream {1} is no longer active - discarding.",
producer, streamId);
// This publisher has gone away, so we should cleanup pub-sub state.
bool removed = State.Producers.Remove(producer);
someProducersRemoved = true; // Re-save state changes at end
counterProducersRemoved.Increment();
counterProducersTotal.DecrementBy(removed ? 1 : 0);
// And ignore this error
}
else
{
throw exc;
}
}
}, TaskContinuationOptions.ExecuteSynchronously);
tasks.Add(addSubscriberPromise);
// Producer is not active (could be stopping / shutting down) so skip
if (logger.IsVerbose) logger.Verbose("Producer {0} on stream {1} is not active - skipping.", producer, streamId);
continue;
}

Exception exception = null;
try
{
await Task.WhenAll(tasks);
}
catch (Exception exc)
{
exception = exc;
}
tasks.Add(NotifyProducer(producer, subscriptionId, streamId, streamConsumer, filter));
}

if (someProducersRemoved)
await WriteStateAsync();
Exception exception = null;
try
{
await Task.WhenAll(tasks);
}
catch (Exception exc)
{
exception = exc;
}

// if the number of producers has been changed, resave state.
if (State.Producers.Count != initialProducerCount)
await WriteStateAsync();

if (exception != null)
throw exception;
}

if (exception != null)
throw exception;
private async Task NotifyProducer(PubSubPublisherState producer, GuidId subscriptionId, StreamId streamId,
IStreamConsumerExtension streamConsumer, IStreamFilterPredicateWrapper filter)
{
try
{
await producer.Producer.AddSubscriber(subscriptionId, streamId, streamConsumer, filter);
}
catch (GrainExtensionNotInstalledException)
{
RemoveProducer(producer);
}
catch (ClientNotAvailableException)
{
RemoveProducer(producer);
}
}

private void RemoveProducer(PubSubPublisherState producer)
{
logger.Warn((int)ErrorCode.Stream_ProducerIsDead,
"Producer {0} on stream {1} is no longer active - permanently removing producer.",
producer, producer.Stream);

if (!State.Producers.Remove(producer)) return;

counterProducersRemoved.Increment();
counterProducersTotal.DecrementBy(1);
}

public async Task UnregisterConsumer(GuidId subscriptionId, StreamId streamId)
Expand Down
2 changes: 1 addition & 1 deletion test/Tester/StreamingTests/SMSClientStreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public override TestingSiloHost CreateSiloHost()
return testHost;
}

[Fact(Skip="Pending PR #1429"), TestCategory("BVT"), TestCategory("Streaming")]
[Fact, TestCategory("BVT"), TestCategory("Streaming")]
public async Task MSMStreamProducerOnDroppedClientTest()
{
logger.Info("************************ SMSDeactivationTest *********************************");
Expand Down

0 comments on commit ec071b9

Please sign in to comment.