Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove dead client producer from pub sub #1429

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Orleans/Messaging/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -542,12 +542,13 @@ public Message CreateResponseMessage()
return response;
}

public Message CreateRejectionResponse(RejectionTypes type, string info)
public Message CreateRejectionResponse(RejectionTypes type, string info, OrleansException ex = null)
{
var response = CreateResponseMessage();
response.Result = ResponseTypes.Rejection;
response.RejectionType = type;
response.RejectionInfo = info;
response.BodyObject = ex;
if (logger.IsVerbose) logger.Verbose("Creating {0} rejection with info '{1}' for {2} at:" + Environment.NewLine + "{3}", type, info, this, new System.Diagnostics.StackTrace(true));
return response;
}
Expand Down
10 changes: 7 additions & 3 deletions src/Orleans/Runtime/GrainReference.cs
Original file line number Diff line number Diff line change
Expand Up @@ -404,11 +404,15 @@ private static void ResponseCallback(Message message, TaskCompletionSource<objec
return; // Ignore duplicates

default:
if (String.IsNullOrEmpty(message.RejectionInfo))
rejection = message.BodyObject as OrleansException;
if (rejection == null)
{
message.RejectionInfo = "Unable to send request - no rejection info available";
if (string.IsNullOrEmpty(message.RejectionInfo))
{
message.RejectionInfo = "Unable to send request - no rejection info available";
}
rejection = new OrleansException(message.RejectionInfo);
}
rejection = new OrleansException(message.RejectionInfo);
break;
}
response = Response.ExceptionResponse(rejection);
Expand Down
2 changes: 1 addition & 1 deletion src/OrleansRuntime/Core/Dispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public void ReceiveMessage(Message message)
{
var str = String.Format("{0} {1}", rejectInfo ?? "", exc == null ? "" : exc.ToString());
MessagingStatisticsGroup.OnRejectedMessage(message);
Message rejection = message.CreateRejectionResponse(rejectType, str);
Message rejection = message.CreateRejectionResponse(rejectType, str, exc as OrleansException);
SendRejectionMessage(rejection);
}
else
Expand Down
121 changes: 62 additions & 59 deletions src/OrleansRuntime/Streams/PubSub/PubSubRendezvousGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,73 +160,76 @@ public async Task UnregisterProducer(StreamId streamId, IStreamProducerExtension
await WriteStateAsync();

int numProducers = State.Producers.Count;
if (numProducers > 0)
{
if (logger.IsVerbose)
logger.Info("Notifying {0} existing producer(s) about new consumer {1}. Producers={2}",
numProducers, streamConsumer, Utils.EnumerableToString(State.Producers));
if (numProducers <= 0)
return;

if (logger.IsVerbose)
logger.Info("Notifying {0} existing producer(s) about new consumer {1}. Producers={2}",
numProducers, streamConsumer, Utils.EnumerableToString(State.Producers));

// Notify producers about a new streamConsumer.
var tasks = new List<Task>();
var producers = State.Producers.ToList();
bool someProducersRemoved = false;
// Notify producers about a new streamConsumer.
var tasks = new List<Task>();
var producers = State.Producers.ToList();
int initialProducerCount = producers.Count;
foreach (var producerState in producers)
{
PubSubPublisherState producer = producerState; // Capture loop variable

foreach (var producerState in producers)
if (!IsActiveProducer(producer.Producer))
{
PubSubPublisherState producer = producerState; // Capture loop variable

if (!IsActiveProducer(producer.Producer))
{
// Producer is not active (could be stopping / shutting down) so skip
if (logger.IsVerbose) logger.Verbose("Producer {0} on stream {1} is not active - skipping.", producer, streamId);
continue;
}

Task addSubscriberPromise = producer.Producer.AddSubscriber(subscriptionId, streamId, streamConsumer, filter)
.ContinueWith(t =>
{
if (t.IsFaulted)
{
var exc = t.Exception.GetBaseException();
if (exc is GrainExtensionNotInstalledException)
{
logger.Warn((int) ErrorCode.Stream_ProducerIsDead,
"Producer {0} on stream {1} is no longer active - discarding.",
producer, streamId);

// This publisher has gone away, so we should cleanup pub-sub state.
bool removed = State.Producers.Remove(producer);
someProducersRemoved = true; // Re-save state changes at end
counterProducersRemoved.Increment();
counterProducersTotal.DecrementBy(removed ? 1 : 0);

// And ignore this error
}
else
{
throw exc;
}
}
}, TaskContinuationOptions.ExecuteSynchronously);
tasks.Add(addSubscriberPromise);
// Producer is not active (could be stopping / shutting down) so skip
if (logger.IsVerbose) logger.Verbose("Producer {0} on stream {1} is not active - skipping.", producer, streamId);
continue;
}

Exception exception = null;
try
{
await Task.WhenAll(tasks);
}
catch (Exception exc)
{
exception = exc;
}
tasks.Add(NotifyProducer(producer, subscriptionId, streamId, streamConsumer, filter));
}

if (someProducersRemoved)
await WriteStateAsync();
Exception exception = null;
try
{
await Task.WhenAll(tasks);
}
catch (Exception exc)
{
exception = exc;
}

// if the number of producers has been changed, resave state.
if (State.Producers.Count != initialProducerCount)
await WriteStateAsync();

if (exception != null)
throw exception;
}

if (exception != null)
throw exception;
private async Task NotifyProducer(PubSubPublisherState producer, GuidId subscriptionId, StreamId streamId,
IStreamConsumerExtension streamConsumer, IStreamFilterPredicateWrapper filter)
{
try
{
await producer.Producer.AddSubscriber(subscriptionId, streamId, streamConsumer, filter);
}
catch (GrainExtensionNotInstalledException)
{
RemoveProducer(producer);
}
catch (ClientNotAvailableException)
{
RemoveProducer(producer);
}
}

private void RemoveProducer(PubSubPublisherState producer)
{
logger.Warn((int)ErrorCode.Stream_ProducerIsDead,
"Producer {0} on stream {1} is no longer active - permanently removing producer.",
producer, producer.Stream);

if (!State.Producers.Remove(producer)) return;

counterProducersRemoved.Increment();
counterProducersTotal.DecrementBy(1);
}

public async Task UnregisterConsumer(GuidId subscriptionId, StreamId streamId)
Expand Down
2 changes: 1 addition & 1 deletion test/Tester/StreamingTests/SMSClientStreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public override TestingSiloHost CreateSiloHost()
return testHost;
}

[Fact(Skip="Pending PR #1429"), TestCategory("BVT"), TestCategory("Streaming")]
[Fact, TestCategory("BVT"), TestCategory("Streaming")]
public async Task MSMStreamProducerOnDroppedClientTest()
{
logger.Info("************************ SMSDeactivationTest *********************************");
Expand Down