Skip to content

Commit

Permalink
Fixed receive context to use a concurrent bag vs. list to be thread s…
Browse files Browse the repository at this point in the history
…afe, solves issue #405
  • Loading branch information
phatboyg committed Nov 25, 2015
1 parent 8e5a3ea commit a235e2a
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 24 deletions.
83 changes: 83 additions & 0 deletions src/MassTransit.Tests/ExcessiveAsyncFault_Specs.cs
@@ -0,0 +1,83 @@
// Copyright 2007-2015 Chris Patterson, Dru Sellers, Travis Smith, et. al.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
namespace MassTransit.Tests
{
namespace NoLog
{
using System;
using System.Linq;
using System.Threading.Tasks;
using MassTransit.Testing;
using NUnit.Framework;
using TestFramework;
using TestFramework.Messages;
using Util;


[TestFixture]
public class An_excessive_fault_storm :
InMemoryTestFixture
{
[Test, Explicit]
public async Task Should_not_explode_the_task_library()
{
var limit = 1000;
for (int i = 0; i < limit; i++)
{
await InputQueueSendEndpoint.Send(new PingMessage());
}

IReceivedMessage<Fault<PingMessage>>[] messages = _consumer.Received.Select<Fault<PingMessage>>().Take(limit).ToArray();

Assert.AreEqual(limit, messages.Length);

Assert.AreEqual(limit,
messages.Select(x => x.Context.Message.Exceptions[0].ExceptionType == TypeMetadataCache<IntentionalTestException>.ShortName).Count());

}

PingConsumer _consumer;

protected override void ConfigureInputQueueEndpoint(IInMemoryReceiveEndpointConfigurator configurator)
{
_consumer = new PingConsumer(TestTimeout);

_consumer.Configure(configurator);

configurator.Consumer<MessageConsumer>();
}


class PingConsumer :
MultiTestConsumer
{
public PingConsumer(TimeSpan timeout)
: base(timeout)
{
Consume<Fault<PingMessage>>();
}
}


public class MessageConsumer : IConsumer<PingMessage>
{
public async Task Consume(ConsumeContext<PingMessage> context)
{
await Task.Delay(100);

throw new IntentionalTestException("Time for crunchin'");
}
}
}
}
}
1 change: 1 addition & 0 deletions src/MassTransit.Tests/MassTransit.Tests.csproj
Expand Up @@ -122,6 +122,7 @@
<Compile Include="Courier\TwoActivityEvent_Specs.cs" />
<Compile Include="Courier\UriArgument_Specs.cs" />
<Compile Include="EventPublish_Specs.cs" />
<Compile Include="ExcessiveAsyncFault_Specs.cs" />
<Compile Include="HeaderObject_Specs.cs" />
<Compile Include="HostInfo_Specs.cs" />
<Compile Include="Introspection_Specs.cs" />
Expand Down
23 changes: 3 additions & 20 deletions src/MassTransit.Tests/masstransit.tests.log4net.xml
@@ -1,19 +1,17 @@
<?xml version="1.0" encoding="utf-8" ?>
<log4net>
<root>
<level value="WARN" />
<level value="NONE" />
<appender-ref ref="LogFileAppender" />
<appender-ref ref="console" />
</root>

<logger name="MassTransit">
<level value="WARN" />
<level value="OFF" />
</logger>

<logger name="MassTransit.Messages" additivity="false">
<level value="ERROR" />
<appender-ref ref="MessageLogAppender" />
<appender-ref ref="console" />
<level value="OFF" />
</logger>

<appender name="console" type="log4net.Appender.ConsoleAppender, log4net">
Expand All @@ -36,19 +34,4 @@
<conversionPattern value="%date{yyyy-MM-dd hh:mm:ss.fff} %5level %m%n" />
</layout>
</appender>

<appender name="MessageLogAppender" type="log4net.Appender.RollingFileAppender">
<file value="MassTransit.Tests.Messages-" />
<appendToFile value="true" />
<datePattern value="yyyyMMdd.lo\g" />
<rollingStyle value="Composite" />
<maxSizeRollBackups value="3" />
<maximumFileSize value="10MB" />
<preserveLogFileNameExtension value="true" />
<countDirection value="1" />
<param name="StaticLogFileName" value="false" />
<layout type="log4net.Layout.PatternLayout">
<conversionPattern value="%date{yyyy-MM-dd hh:mm:ss.fff} %5level %m%n" />
</layout>
</appender>
</log4net>
8 changes: 4 additions & 4 deletions src/MassTransit/Context/BaseReceiveContext.cs
Expand Up @@ -13,7 +13,7 @@
namespace MassTransit.Context
{
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO;
using System.Net.Mime;
Expand All @@ -31,7 +31,7 @@ public abstract class BaseReceiveContext :
readonly Lazy<ContentType> _contentType;
readonly Lazy<Headers> _headers;
readonly PayloadCache _payloadCache;
readonly IList<Task> _pendingTasks;
readonly ConcurrentBag<Task> _pendingTasks;
readonly IReceiveObserver _receiveObserver;
readonly Stopwatch _receiveTimer;

Expand All @@ -51,13 +51,13 @@ protected BaseReceiveContext(Uri inputAddress, bool redelivered, IReceiveObserve

_contentType = new Lazy<ContentType>(GetContentType);

_pendingTasks = new List<Task>();
_pendingTasks = new ConcurrentBag<Task>();
}

protected abstract IHeaderProvider HeaderProvider { get; }
public bool IsDelivered { get; private set; }
public bool IsFaulted { get; private set; }
public Task CompleteTask => Task.WhenAll(_pendingTasks);
public Task CompleteTask => Task.WhenAll(_pendingTasks.ToArray());

public void AddPendingTask(Task task)
{
Expand Down

0 comments on commit a235e2a

Please sign in to comment.