Permalink
Browse files

Merge branch 'dev' of github.com:/EventStore/EventStore into dev

  • Loading branch information...
2 parents 9fb3956 + 226a6d4 commit 0788973459b6a2a0c99bfa8c84f042c49049dad4 @TarasRoshko TarasRoshko committed Nov 12, 2012
Showing with 612 additions and 634 deletions.
  1. +21 −3 src/EventStore/EventStore.Common/Log/LogManager.cs
  2. +0 −140 src/EventStore/EventStore.Core.Tests/Bus/Helpers/BusTestHandlers.cs
  3. +2 −1 src/EventStore/EventStore.Core.Tests/Bus/{QueuedHandler → }/Helpers/Messages.cs
  4. +12 −0 src/EventStore/EventStore.Core.Tests/Bus/Helpers/NoopConsumer.cs
  5. +14 −8 ...ntStore/EventStore.Core.Tests/Bus/Helpers/{BusTestBase.cs → QueuedHandlerTestWithNoopConsumer.cs}
  6. +27 −0 src/EventStore/EventStore.Core.Tests/Bus/Helpers/QueuedHandlerTestWithWaitingConsumer.cs
  7. +16 −0 src/EventStore/EventStore.Core.Tests/Bus/Helpers/TestHandler.cs
  8. +53 −0 src/EventStore/EventStore.Core.Tests/Bus/Helpers/TestMultiHandler.cs
  9. +9 −31 ...Store/EventStore.Core.Tests/Bus/{QueuedHandler/Helpers/Consumers.cs → Helpers/WaitingConsumer.cs}
  10. +0 −79 src/EventStore/EventStore.Core.Tests/Bus/QueuedHandler/Helpers/QueuedHandlerTestBase.cs
  11. +7 −5 src/EventStore/EventStore.Core.Tests/Bus/{QueuedHandler → }/queued_handler_should.cs
  12. +60 −46 src/EventStore/EventStore.Core.Tests/Bus/{when_publishing.cs → when_publishing_into_memory_bus.cs}
  13. +36 −39 ...entStore.Core.Tests/Bus/{QueuedHandler/when_publishing.cs → when_publishing_to_queued_handler.cs}
  14. +24 −25 ...andler/when_publishing_before_starting.cs → when_publishing_to_queued_handler_before_starting.cs}
  15. +10 −9 ...tore/EventStore.Core.Tests/Bus/{QueuedHandler/when_starting.cs → when_starting_queued_handler.cs}
  16. +29 −21 ...tore/EventStore.Core.Tests/Bus/{QueuedHandler/when_stopping.cs → when_stopping_queued_handler.cs}
  17. +72 −58 src/EventStore/EventStore.Core.Tests/Bus/{when_subscribing.cs → when_subscribing_to_memory_bus.cs}
  18. +79 −64 ...ntStore/EventStore.Core.Tests/Bus/{when_unsubscribing.cs → when_unsubscribing_from_memory_bus.cs}
  19. +15 −13 src/EventStore/EventStore.Core.Tests/EventStore.Core.Tests.csproj
  20. +0 −1 src/EventStore/EventStore.Core.Tests/Services/Storage/ReadIndexTestScenario.cs
  21. +1 −8 ....Tests/Services/Storage/Transactions/when_rebuilding_index_for_partially_persisted_transaction.cs
  22. +4 −3 src/EventStore/EventStore.Core/Bus/InMemoryBus.cs
  23. +4 −2 src/EventStore/EventStore.Core/Bus/QueuedHandler.cs
  24. +14 −10 src/EventStore/EventStore.Core/Index/TableIndex.cs
  25. +3 −0 src/EventStore/EventStore.Core/Messages/HttpClientMessageDto.cs
  26. +5 −9 src/EventStore/EventStore.Core/Services/Storage/StorageWriter.cs
  27. +21 −8 src/EventStore/EventStore.Core/Services/Transport/Http/AutoEventConverter.cs
  28. +4 −3 src/EventStore/EventStore.Projections.Core.Tests/Services/TestFixtureWithProjectionCoreService.cs
  29. +4 −4 ...Store/EventStore.Projections.Core.Tests/Services/core_projection/TestFixtureWithCoreProjection.cs
  30. +2 −3 ...Store/EventStore.Projections.Core.Tests/Services/core_projection/TestFixtureWithExistingEvents.cs
  31. +2 −2 ...entStore/EventStore.Projections.Core.Tests/Services/core_projection/when_starting_a_projection.cs
  32. +6 −6 ....Projections.Core.Tests/Services/projection_subscription/TestFixtureWithProjectionSubscription.cs
  33. +12 −12 ....Projections.Core.Tests/Services/projection_subscription/when_creating_projection_subscription.cs
  34. +3 −3 ...Core.Tests/Services/projections_manager/managed_projection/TestFixtureWithReadWriteDisaptchers.cs
  35. +1 −1 src/EventStore/EventStore.Projections.Core/Services/Management/ManagedProjection.cs
  36. +6 −1 src/EventStore/EventStore.Projections.Core/Services/Management/ProjectionManager.cs
  37. +4 −4 src/EventStore/EventStore.SingleNode/NLog.config
  38. +6 −3 src/EventStore/EventStore.TestClient/Commands/WriteFloodHttpProcessor.cs
  39. +4 −1 src/EventStore/EventStore.TestClient/Commands/WriteFloodProcessor.cs
  40. +6 −3 src/EventStore/EventStore.TestClient/Commands/WriteFloodWaitingHttpProcessor.cs
  41. +4 −1 src/EventStore/EventStore.TestClient/Commands/WriteFloodWaitingProcessor.cs
  42. +6 −3 src/EventStore/EventStore.TestClient/Commands/WriteLongTermHttpProcessor.cs
  43. +4 −1 src/EventStore/EventStore.TestClient/Commands/WriteLongTermProcessor.cs
View
24 src/EventStore/EventStore.Common/Log/LogManager.cs
@@ -27,23 +27,42 @@
//
using System;
using System.IO;
+using System.Text;
using EventStore.Common.Configuration;
using EventStore.Common.Utils;
+using NLog;
+using NLog.LayoutRenderers;
namespace EventStore.Common.Log
{
+ [LayoutRenderer("logsdir")]
+ public class NLogDirectoryLayoutRendered : LayoutRenderer
+ {
+ protected override void Append(StringBuilder builder, LogEventInfo logEvent)
+ {
+ builder.Append(LogManager._logsDirectory);
+ }
+ }
+
public static class LogManager
{
+
+ static LogManager()
+ {
+ NLog.Config.ConfigurationItemFactory.Default.LayoutRenderers.RegisterDefinition("logsdir", typeof(NLogDirectoryLayoutRendered));
+ }
+
private static readonly ILogger GlobalLogger = GetLogger("GLOBAL-LOGGER");
private static bool _initialized;
+ internal static string _logsDirectory;
public static string LogsDirectory
{
get
{
if (!_initialized)
throw new InvalidOperationException("Init method must be called");
- return Environment.GetEnvironmentVariable(Constants.EnvVarPrefix + Constants.EnvVarLogsSuffix);
+ return _logsDirectory;
}
}
@@ -72,8 +91,7 @@ public static void Init(string componentName, string logsDirectory)
private static void SetLogsDirectoryInternal(string logsDirectory)
{
- const string logsDirEnvVar = Constants.EnvVarPrefix + Constants.EnvVarLogsSuffix;
- Environment.SetEnvironmentVariable(logsDirEnvVar, logsDirectory, EnvironmentVariableTarget.Process);
+ _logsDirectory = logsDirectory;
}
private static void SetComponentName(string componentName)
View
140 src/EventStore/EventStore.Core.Tests/Bus/Helpers/BusTestHandlers.cs
@@ -1,140 +0,0 @@
-// Copyright (c) 2012, Event Store LLP
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// Redistributions of source code must retain the above copyright notice,
-// this list of conditions and the following disclaimer.
-// Redistributions in binary form must reproduce the above copyright
-// notice, this list of conditions and the following disclaimer in the
-// documentation and/or other materials provided with the distribution.
-// Neither the name of the Event Store LLP nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-//
-using System.Collections.Generic;
-using EventStore.Core.Bus;
-using EventStore.Core.Messaging;
-
-namespace EventStore.Core.Tests.Bus.Helpers
-{
- public abstract class TestHandlerBase
- {
- public readonly List<Message> HandledMessages = new List<Message>();
-
- public bool DidntHandleAnyMessages()
- {
- return HandledMessages.Count == 0;
- }
- }
-
- public class TestMessageHandler<T> : IHandle<T> where T: Message
- {
- public readonly List<T> HandledMessages = new List<T>();
-
- public void Handle(T message)
- {
- HandledMessages.Add(message);
- }
- }
-
- public class TestHandler : TestHandlerBase, IHandle<TestMessage>
- {
- public void Handle(TestMessage message)
- {
- HandledMessages.Add(message);
- }
- }
- public class TestHandler2 : TestHandlerBase, IHandle<TestMessage2>
- {
- public void Handle(TestMessage2 message)
- {
- HandledMessages.Add(message);
- }
- }
- public class TestHandler3 : TestHandlerBase, IHandle<TestMessage3>
- {
- public void Handle(TestMessage3 message)
- {
- HandledMessages.Add(message);
- }
- }
-
- public class ParentTestHandler : TestHandlerBase, IHandle<ParentTestMessage>
- {
- public void Handle(ParentTestMessage message)
- {
- HandledMessages.Add(message);
- }
- }
- public class ChildTestHandler : TestHandlerBase, IHandle<ChildTestMessage>
- {
- public void Handle(ChildTestMessage message)
- {
- HandledMessages.Add(message);
- }
- }
- public class GrandChildTestHandler : TestHandlerBase, IHandle<GrandChildTestMessage>
- {
- public void Handle(GrandChildTestMessage message)
- {
- HandledMessages.Add(message);
- }
- }
-
- public class MultipleMessagesTestHandler : TestHandlerBase, IHandle<TestMessage>, IHandle<TestMessage2>, IHandle<TestMessage3>
- {
- public void Handle(TestMessage message)
- {
- HandledMessages.Add(message);
- }
-
- public void Handle(TestMessage2 message)
- {
- HandledMessages.Add(message);
- }
-
- public void Handle(TestMessage3 message)
- {
- HandledMessages.Add(message);
- }
- }
-
- public class SameMessageHandler1 : TestHandlerBase, IHandle<TestMessage>
- {
- public void Handle(TestMessage message)
- {
- HandledMessages.Add(message);
- }
- }
- public class SameMessageHandler2 : TestHandlerBase, IHandle<TestMessage>
- {
- public void Handle(TestMessage message)
- {
- HandledMessages.Add(message);
- }
- }
-
- public class MessageWithIdHandler : TestHandlerBase, IHandle<TestMessageWithId>
- {
- public void Handle(TestMessageWithId message)
- {
- HandledMessages.Add(message);
- }
- }
-
-
-}
View
3 ...sts/Bus/QueuedHandler/Helpers/Messages.cs → ...tStore.Core.Tests/Bus/Helpers/Messages.cs
@@ -25,10 +25,11 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
+
using System;
using EventStore.Core.Messaging;
-namespace EventStore.Core.Tests.Bus.QueuedHandler.Helpers
+namespace EventStore.Core.Tests.Bus.Helpers
{
public class DeferredExecutionTestMessage : Message
{
View
12 src/EventStore/EventStore.Core.Tests/Bus/Helpers/NoopConsumer.cs
@@ -0,0 +1,12 @@
+using EventStore.Core.Bus;
+using EventStore.Core.Messaging;
+
+namespace EventStore.Core.Tests.Bus.Helpers
+{
+ public class NoopConsumer : IHandle<Message>
+ {
+ public void Handle(Message message)
+ {
+ }
+ }
+}
View
22 ...ore.Core.Tests/Bus/Helpers/BusTestBase.cs → ...pers/QueuedHandlerTestWithNoopConsumer.cs
@@ -1,4 +1,4 @@
-// Copyright (c) 2012, Event Store LLP
+// Copyright (c) 2012, Event Store LLP
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
@@ -25,25 +25,31 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
+
using EventStore.Core.Bus;
+using EventStore.Core.Messaging;
using NUnit.Framework;
namespace EventStore.Core.Tests.Bus.Helpers
{
- public abstract class BusTestBase
+ public abstract class QueuedHandlerTestWithNoopConsumer
{
- protected InMemoryBus Bus;
+ protected QueuedHandler Queue;
+ protected IHandle<Message> Consumer;
[SetUp]
- protected virtual void SetUp()
+ public virtual void SetUp()
{
- Bus = new InMemoryBus("test_bus", watchSlowMsg: false);
+ Consumer = new NoopConsumer();
+ Queue = new QueuedHandler(Consumer, "test_name", watchSlowMsg: false, threadStopWaitTimeoutMs: 100);
}
[TearDown]
- protected virtual void TearDown()
+ public virtual void TearDown()
{
- Bus = null;
+ Queue.Stop();
+ Queue = null;
+ Consumer = null;
}
}
-}
+}
View
27 src/EventStore/EventStore.Core.Tests/Bus/Helpers/QueuedHandlerTestWithWaitingConsumer.cs
@@ -0,0 +1,27 @@
+using EventStore.Core.Bus;
+using NUnit.Framework;
+
+namespace EventStore.Core.Tests.Bus.Helpers
+{
+ public abstract class QueuedHandlerTestWithWaitingConsumer
+ {
+ protected QueuedHandler Queue;
+ protected WaitingConsumer Consumer;
+
+ [SetUp]
+ public virtual void SetUp()
+ {
+ Consumer = new WaitingConsumer(0);
+ Queue = new QueuedHandler(Consumer, "waiting_queue", watchSlowMsg: false, threadStopWaitTimeoutMs: 100);
+ }
+
+ [TearDown]
+ public virtual void TearDown()
+ {
+ Queue.Stop();
+ Queue = null;
+ Consumer.Dispose();
+ Consumer = null;
+ }
+ }
+}
View
16 src/EventStore/EventStore.Core.Tests/Bus/Helpers/TestHandler.cs
@@ -0,0 +1,16 @@
+using System.Collections.Generic;
+using EventStore.Core.Bus;
+using EventStore.Core.Messaging;
+
+namespace EventStore.Core.Tests.Bus.Helpers
+{
+ public class TestHandler<T> : IHandle<T> where T : Message
+ {
+ public readonly List<T> HandledMessages = new List<T>();
+
+ public void Handle(T message)
+ {
+ HandledMessages.Add(message);
+ }
+ }
+}
View
53 src/EventStore/EventStore.Core.Tests/Bus/Helpers/TestMultiHandler.cs
@@ -0,0 +1,53 @@
+// Copyright (c) 2012, Event Store LLP
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// Neither the name of the Event Store LLP nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+using System.Collections.Generic;
+using EventStore.Core.Bus;
+using EventStore.Core.Messaging;
+
+namespace EventStore.Core.Tests.Bus.Helpers
+{
+ public class TestMultiHandler : IHandle<TestMessage>, IHandle<TestMessage2>, IHandle<TestMessage3>
+ {
+ public readonly List<Message> HandledMessages = new List<Message>();
+
+ public void Handle(TestMessage message)
+ {
+ HandledMessages.Add(message);
+ }
+
+ public void Handle(TestMessage2 message)
+ {
+ HandledMessages.Add(message);
+ }
+
+ public void Handle(TestMessage3 message)
+ {
+ HandledMessages.Add(message);
+ }
+ }
+}
View
40 ...ts/Bus/QueuedHandler/Helpers/Consumers.cs → ...Core.Tests/Bus/Helpers/WaitingConsumer.cs
@@ -25,43 +25,19 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
+
using System;
using System.Collections.Generic;
-using System.Linq;
-using System.Text;
using System.Threading;
using EventStore.Core.Bus;
using EventStore.Core.Messaging;
-using EventStore.Core.Tests.Bus.Helpers;
-namespace EventStore.Core.Tests.Bus.QueuedHandler.Helpers
+namespace EventStore.Core.Tests.Bus.Helpers
{
- public class StupidConsumer : IHandle<Message>
- {
- public void Handle(Message message) { }
- }
-
- public abstract class WatchingConsumerBase : IHandle<Message>
+ public class WaitingConsumer : IHandle<Message>, IDisposable
{
public readonly List<Message> HandledMessages = new List<Message>();
-
- public virtual void Handle(Message message)
- {
- HandledMessages.Add(message);
- }
-
- protected static void ExecuteIf<TMessage>(Message msg, Action<TMessage> action) where TMessage : Message
- {
- var typedMsg = msg as TMessage;
- if (typedMsg != null)
- action(typedMsg);
- }
- }
-
- public class WatchingConsumer : WatchingConsumerBase { }
-
- public class WaitingConsumer : WatchingConsumerBase, IDisposable
- {
+
private readonly CountdownEvent _countdownEvent;
public WaitingConsumer(int initialCount)
@@ -79,11 +55,13 @@ public bool Wait(int ms = 100)
return _countdownEvent.Wait(ms);
}
- public override void Handle(Message message)
+ public void Handle(Message message)
{
- base.Handle(message);
+ HandledMessages.Add(message);
- ExecuteIf<DeferredExecutionTestMessage>(message, deffered => deffered.Execute());
+ var typedMsg = message as DeferredExecutionTestMessage;
+ if (typedMsg != null)
+ ((Action<DeferredExecutionTestMessage>) (deffered => deffered.Execute()))(typedMsg);
_countdownEvent.Signal();
}
View
79 src/EventStore/EventStore.Core.Tests/Bus/QueuedHandler/Helpers/QueuedHandlerTestBase.cs
@@ -1,79 +0,0 @@
-// Copyright (c) 2012, Event Store LLP
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// Redistributions of source code must retain the above copyright notice,
-// this list of conditions and the following disclaimer.
-// Redistributions in binary form must reproduce the above copyright
-// notice, this list of conditions and the following disclaimer in the
-// documentation and/or other materials provided with the distribution.
-// Neither the name of the Event Store LLP nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-//
-using EventStore.Core.Bus;
-using EventStore.Core.Messaging;
-using NUnit.Framework;
-
-namespace EventStore.Core.Tests.Bus.QueuedHandler.Helpers
-{
- public abstract class QueuedHandlerTestBase
- {
- protected Core.Bus.QueuedHandler _queue;
-
- [SetUp]
- public abstract void SetUp();
-
- [TearDown]
- public abstract void TearDown();
- }
-
- public abstract class QueuedHandlerTestWithStupidConsumer :QueuedHandlerTestBase
- {
- protected IHandle<Message> _consumer;
-
- public override void SetUp()
- {
- _consumer = new StupidConsumer();
- _queue = new Core.Bus.QueuedHandler(_consumer, "test_name", watchSlowMsg: false, threadStopWaitTimeoutMs: 100);
- }
-
- public override void TearDown()
- {
- _queue = null;
- _consumer = null;
- }
- }
-
- public class QueuedHandlerTestWithWaitingConsumer :QueuedHandlerTestBase
- {
- protected WaitingConsumer _consumer;
-
- public override void SetUp()
- {
- _consumer = new WaitingConsumer(0);
- _queue = new Core.Bus.QueuedHandler(_consumer,"waiting_queue",watchSlowMsg: false, threadStopWaitTimeoutMs:100);
- }
-
- public override void TearDown()
- {
- _queue = null;
- _consumer.Dispose();
- _consumer = null;
- }
- }
-}
View
12 ...us/QueuedHandler/queued_handler_should.cs → ...e.Core.Tests/Bus/queued_handler_should.cs
@@ -25,25 +25,27 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
+
using System;
-using EventStore.Core.Tests.Bus.QueuedHandler.Helpers;
+using EventStore.Core.Bus;
+using EventStore.Core.Tests.Bus.Helpers;
using NUnit.Framework;
-namespace EventStore.Core.Tests.Bus.QueuedHandler
+namespace EventStore.Core.Tests.Bus
{
[TestFixture]
- public class queue_handler_should : QueuedHandlerTestWithStupidConsumer
+ public class queued_handler_should : QueuedHandlerTestWithNoopConsumer
{
[Test]
public void throw_if_handler_is_null()
{
- Assert.Throws<ArgumentNullException>(() => new Core.Bus.QueuedHandler(null, "throwing", watchSlowMsg: false));
+ Assert.Throws<ArgumentNullException>(() => new QueuedHandler(null, "throwing", watchSlowMsg: false));
}
[Test]
public void throw_if_name_is_null()
{
- Assert.Throws<ArgumentNullException>(() => new Core.Bus.QueuedHandler(_consumer, null, watchSlowMsg: false));
+ Assert.Throws<ArgumentNullException>(() => new QueuedHandler(Consumer, null, watchSlowMsg: false));
}
}
}
View
106 ...ntStore.Core.Tests/Bus/when_publishing.cs → ...ts/Bus/when_publishing_into_memory_bus.cs
@@ -26,80 +26,94 @@
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
using System;
+using EventStore.Core.Bus;
using EventStore.Core.Tests.Bus.Helpers;
using EventStore.Core.Tests.Common;
using NUnit.Framework;
namespace EventStore.Core.Tests.Bus
{
[TestFixture]
- public class when_publishing : BusTestBase
+ public class when_publishing_into_memory_bus
{
+ private InMemoryBus _bus;
+
+ [SetUp]
+ public void SetUp()
+ {
+ _bus = new InMemoryBus("test_bus", watchSlowMsg: false);
+ }
+
+ [TearDown]
+ public void TearDown()
+ {
+ _bus = null;
+ }
+
[Test]
public void null_message_app_should_throw()
{
- Assert.Throws<ArgumentNullException>(() => Bus.Publish(null));
+ Assert.Throws<ArgumentNullException>(() => _bus.Publish(null));
}
[Test]
public void unsubscribed_messages_noone_should_handle_it()
{
- var handler1 = new TestHandler();
- var handler2 = new TestHandler2();
- var handler3 = new TestHandler3();
+ var handler1 = new TestHandler<TestMessage>();
+ var handler2 = new TestHandler<TestMessage2>();
+ var handler3 = new TestHandler<TestMessage3>();
- Bus.Publish(new TestMessage());
- Bus.Publish(new TestMessage2());
- Bus.Publish(new TestMessage3());
+ _bus.Publish(new TestMessage());
+ _bus.Publish(new TestMessage2());
+ _bus.Publish(new TestMessage3());
- Assert.That(handler1.DidntHandleAnyMessages() &&
- handler2.DidntHandleAnyMessages() &&
- handler3.DidntHandleAnyMessages());
+ Assert.That(handler1.HandledMessages.Count == 0
+ && handler2.HandledMessages.Count == 0
+ && handler3.HandledMessages.Count == 0);
}
[Test]
public void any_message_no_other_messages_should_be_published()
{
- var handler1 = new TestHandler();
- var handler2 = new TestHandler2();
+ var handler1 = new TestHandler<TestMessage>();
+ var handler2 = new TestHandler<TestMessage2>();
- Bus.Subscribe<TestMessage>(handler1);
- Bus.Subscribe<TestMessage2>(handler2);
+ _bus.Subscribe<TestMessage>(handler1);
+ _bus.Subscribe<TestMessage2>(handler2);
- Bus.Publish(new TestMessage());
+ _bus.Publish(new TestMessage());
- Assert.That(handler1.HandledMessages.ContainsSingle<TestMessage>() &&
- handler2.DidntHandleAnyMessages());
+ Assert.That(handler1.HandledMessages.ContainsSingle<TestMessage>() && handler2.HandledMessages.Count == 0);
}
[Test]
public void same_message_n_times_it_should_be_handled_n_times()
{
- var handler = new MessageWithIdHandler();
+ var handler = new TestHandler<TestMessageWithId>();
var message = new TestMessageWithId(11);
- Bus.Subscribe<TestMessageWithId>(handler);
+ _bus.Subscribe<TestMessageWithId>(handler);
- Bus.Publish(message);
- Bus.Publish(message);
- Bus.Publish(message);
+ _bus.Publish(message);
+ _bus.Publish(message);
+ _bus.Publish(message);
Assert.That(handler.HandledMessages.ContainsN<TestMessageWithId>(3, mes => mes.Id == 11));
}
[Test]
public void multiple_messages_of_same_type_they_all_should_be_delivered()
{
- var handler = new MessageWithIdHandler();
+ var handler = new TestHandler<TestMessageWithId>();
var message1 = new TestMessageWithId(1);
var message2 = new TestMessageWithId(2);
var message3 = new TestMessageWithId(3);
- Bus.Subscribe<TestMessageWithId>(handler);
+ _bus.Subscribe<TestMessageWithId>(handler);
- Bus.Publish(message1);
- Bus.Publish(message2);
- Bus.Publish(message3);
+ _bus.Publish(message1);
+ _bus.Publish(message2);
+ _bus.Publish(message3);
Assert.That(handler.HandledMessages.ContainsSingle<TestMessageWithId>(mes => mes.Id == 1));
Assert.That(handler.HandledMessages.ContainsSingle<TestMessageWithId>(mes => mes.Id == 2));
@@ -109,35 +123,35 @@ public void multiple_messages_of_same_type_they_all_should_be_delivered()
[Test]
public void message_of_child_type_then_all_subscribed_handlers_of_parent_type_should_handle_message()
{
- var parentHandler = new ParentTestHandler();
- Bus.Subscribe<ParentTestMessage>(parentHandler);
+ var parentHandler = new TestHandler<ParentTestMessage>();
+ _bus.Subscribe<ParentTestMessage>(parentHandler);
- Bus.Publish(new ChildTestMessage());
+ _bus.Publish(new ChildTestMessage());
Assert.That(parentHandler.HandledMessages.ContainsSingle<ChildTestMessage>());
}
[Test]
public void message_of_parent_type_then_no_subscribed_handlers_of_child_type_should_handle_message()
{
- var childHandler = new ChildTestHandler();
- Bus.Subscribe<ChildTestMessage>(childHandler);
+ var childHandler = new TestHandler<ChildTestMessage>();
+ _bus.Subscribe<ChildTestMessage>(childHandler);
- Bus.Publish(new ParentTestMessage());
+ _bus.Publish(new ParentTestMessage());
Assert.That(childHandler.HandledMessages.ContainsNo<ParentTestMessage>());
}
[Test]
public void message_of_grand_child_type_then_all_subscribed_handlers_of_base_types_should_handle_message()
{
- var parentHandler = new ParentTestHandler();
- var childHandler = new ChildTestHandler();
+ var parentHandler = new TestHandler<ParentTestMessage>();
+ var childHandler = new TestHandler<ChildTestMessage>();
- Bus.Subscribe<ParentTestMessage>(parentHandler);
- Bus.Subscribe<ChildTestMessage>(childHandler);
+ _bus.Subscribe<ParentTestMessage>(parentHandler);
+ _bus.Subscribe<ChildTestMessage>(childHandler);
- Bus.Publish(new GrandChildTestMessage());
+ _bus.Publish(new GrandChildTestMessage());
Assert.That(parentHandler.HandledMessages.ContainsSingle<GrandChildTestMessage>() &&
childHandler.HandledMessages.ContainsSingle<GrandChildTestMessage>());
@@ -146,15 +160,15 @@ public void message_of_grand_child_type_then_all_subscribed_handlers_of_base_typ
[Test]
public void message_of_grand_child_type_then_all_subscribed_handlers_of_parent_types_including_grand_child_handler_should_handle_message()
{
- var parentHandler = new ParentTestHandler();
- var childHandler = new ChildTestHandler();
- var grandChildHandler = new GrandChildTestHandler();
+ var parentHandler = new TestHandler<ParentTestMessage>();
+ var childHandler = new TestHandler<ChildTestMessage>();
+ var grandChildHandler = new TestHandler<GrandChildTestMessage>();
- Bus.Subscribe<ParentTestMessage>(parentHandler);
- Bus.Subscribe<ChildTestMessage>(childHandler);
- Bus.Subscribe<GrandChildTestMessage>(grandChildHandler);
+ _bus.Subscribe<ParentTestMessage>(parentHandler);
+ _bus.Subscribe<ChildTestMessage>(childHandler);
+ _bus.Subscribe<GrandChildTestMessage>(grandChildHandler);
- Bus.Publish(new GrandChildTestMessage());
+ _bus.Publish(new GrandChildTestMessage());
Assert.That(parentHandler.HandledMessages.ContainsSingle<GrandChildTestMessage>() &&
childHandler.HandledMessages.ContainsSingle<GrandChildTestMessage>() &&
View
75 ...ests/Bus/QueuedHandler/when_publishing.cs → .../Bus/when_publishing_to_queued_handler.cs
@@ -25,84 +25,81 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
+
using System;
-using System.Collections.Generic;
using System.Linq;
-using System.Text;
using System.Threading;
-using System.Threading.Tasks;
using EventStore.Core.Tests.Bus.Helpers;
-using EventStore.Core.Tests.Bus.QueuedHandler.Helpers;
using EventStore.Core.Tests.Common;
using NUnit.Framework;
-namespace EventStore.Core.Tests.Bus.QueuedHandler
+namespace EventStore.Core.Tests.Bus
{
[TestFixture]
- public class when_publishing : QueuedHandlerTestWithWaitingConsumer
+ public class when_publishing_to_queued_handler : QueuedHandlerTestWithWaitingConsumer
{
public override void SetUp()
{
base.SetUp();
- _queue.Start();
+ Queue.Start();
}
public override void TearDown()
{
- _consumer.Dispose();
- _queue.Stop();
+ Consumer.Dispose();
+ Queue.Stop();
base.TearDown();
}
[Test]
public void null_message_should_throw()
{
- Assert.Throws<ArgumentNullException>(() => _queue.Publish(null));
+ Assert.Throws<ArgumentNullException>(() => Queue.Publish(null));
}
[Test]
public void message_it_should_be_delivered_to_bus()
{
- _consumer.SetWaitingCount(1);
+ Consumer.SetWaitingCount(1);
- _queue.Publish(new TestMessage());
+ Queue.Publish(new TestMessage());
- _consumer.Wait();
- Assert.That(_consumer.HandledMessages.ContainsSingle<TestMessage>());
+ Consumer.Wait();
+ Assert.That(Consumer.HandledMessages.ContainsSingle<TestMessage>());
}
[Test]
public void multiple_messages_they_should_be_delivered_to_bus()
{
- _consumer.SetWaitingCount(2);
+ Consumer.SetWaitingCount(2);
- _queue.Publish(new TestMessage());
- _queue.Publish(new TestMessage2());
+ Queue.Publish(new TestMessage());
+ Queue.Publish(new TestMessage2());
- _consumer.Wait();
+ Consumer.Wait();
- Assert.That(_consumer.HandledMessages.ContainsSingle<TestMessage>() &&
- _consumer.HandledMessages.ContainsSingle<TestMessage2>());
+ Assert.That(Consumer.HandledMessages.ContainsSingle<TestMessage>() &&
+ Consumer.HandledMessages.ContainsSingle<TestMessage2>());
}
[Test]
public void messages_from_different_threads_they_should_be_all_executed_in_same_one()
{
- _consumer.SetWaitingCount(3);
+ Consumer.SetWaitingCount(3);
int msg1ThreadId = 0, msg2ThreadId = 1, msg3ThreadId = 2;
var threads = new[]
{
- new Thread(() => _queue.Publish(new DeferredExecutionTestMessage(() =>{msg1ThreadId = Thread.CurrentThread.ManagedThreadId;}))),
- new Thread(() => _queue.Publish(new DeferredExecutionTestMessage(() =>{msg2ThreadId = Thread.CurrentThread.ManagedThreadId;}))),
- new Thread(() => _queue.Publish(new DeferredExecutionTestMessage(() =>{msg3ThreadId = Thread.CurrentThread.ManagedThreadId;})))
+ new Thread(() => Queue.Publish(new DeferredExecutionTestMessage(() =>{msg1ThreadId = Thread.CurrentThread.ManagedThreadId;}))),
+ new Thread(() => Queue.Publish(new DeferredExecutionTestMessage(() =>{msg2ThreadId = Thread.CurrentThread.ManagedThreadId;}))),
+ new Thread(() => Queue.Publish(new DeferredExecutionTestMessage(() =>{msg3ThreadId = Thread.CurrentThread.ManagedThreadId;})))
};
foreach (var thread in threads)
thread.Start();
- bool executedOnTime = _consumer.Wait(500);
+ bool executedOnTime = Consumer.Wait(500);
if (!executedOnTime)
{
@@ -116,18 +113,18 @@ public void messages_from_different_threads_they_should_be_all_executed_in_same_
[Test]
public void messages_order_should_remain_the_same()
{
- _consumer.SetWaitingCount(6);
+ Consumer.SetWaitingCount(6);
- _queue.Publish(new TestMessageWithId(4));
- _queue.Publish(new TestMessageWithId(8));
- _queue.Publish(new TestMessageWithId(15));
- _queue.Publish(new TestMessageWithId(16));
- _queue.Publish(new TestMessageWithId(23));
- _queue.Publish(new TestMessageWithId(42));
+ Queue.Publish(new TestMessageWithId(4));
+ Queue.Publish(new TestMessageWithId(8));
+ Queue.Publish(new TestMessageWithId(15));
+ Queue.Publish(new TestMessageWithId(16));
+ Queue.Publish(new TestMessageWithId(23));
+ Queue.Publish(new TestMessageWithId(42));
- _consumer.Wait();
+ Consumer.Wait();
- var typedMessages = _consumer.HandledMessages.OfType<TestMessageWithId>().ToArray();
+ var typedMessages = Consumer.HandledMessages.OfType<TestMessageWithId>().ToArray();
Assert.That(typedMessages.Length == 6 &&
typedMessages[0].Id == 4 &&
typedMessages[1].Id == 8 &&
@@ -140,19 +137,19 @@ public void messages_order_should_remain_the_same()
[Test]
public void message_while_queue_has_hanged_it_should_not_be_executed()
{
- _consumer.SetWaitingCount(2);
+ Consumer.SetWaitingCount(2);
var waitHandle = new ManualResetEvent(false);
try
{
- _queue.Publish(new DeferredExecutionTestMessage(() => waitHandle.WaitOne()));
- _queue.Publish(new TestMessage());
+ Queue.Publish(new DeferredExecutionTestMessage(() => waitHandle.WaitOne()));
+ Queue.Publish(new TestMessage());
- Assert.That(_consumer.HandledMessages.ContainsNo<TestMessage>());
+ Assert.That(Consumer.HandledMessages.ContainsNo<TestMessage>());
}
finally
{
waitHandle.Set();
- _consumer.Wait();
+ Consumer.Wait();
waitHandle.Dispose();
}
}
View
49 ...andler/when_publishing_before_starting.cs → ...hing_to_queued_handler_before_starting.cs
@@ -25,76 +25,75 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
-using System;
+
using EventStore.Core.Tests.Bus.Helpers;
-using EventStore.Core.Tests.Bus.QueuedHandler.Helpers;
using EventStore.Core.Tests.Common;
using NUnit.Framework;
-namespace EventStore.Core.Tests.Bus.QueuedHandler
+namespace EventStore.Core.Tests.Bus
{
[TestFixture]
- public class when_publishing_before_starting : QueuedHandlerTestWithWaitingConsumer
+ public class when_publishing_to_queued_handler_before_starting : QueuedHandlerTestWithWaitingConsumer
{
[Test]
public void should_not_throw()
{
- Assert.DoesNotThrow(() => _queue.Publish(new TestMessage()));
+ Assert.DoesNotThrow(() => Queue.Publish(new TestMessage()));
}
[Test]
public void should_not_forward_message_to_bus()
{
- _consumer.SetWaitingCount(1);
+ Consumer.SetWaitingCount(1);
- _queue.Publish(new TestMessage());
+ Queue.Publish(new TestMessage());
- _consumer.Wait(10);
+ Consumer.Wait(10);
- Assert.That(_consumer.HandledMessages.ContainsNo<TestMessage>());
+ Assert.That(Consumer.HandledMessages.ContainsNo<TestMessage>());
}
[Test]
public void and_then_starting_message_should_be_forwarded_to_bus()
{
- _consumer.SetWaitingCount(1);
+ Consumer.SetWaitingCount(1);
- _queue.Publish(new TestMessage());
+ Queue.Publish(new TestMessage());
try
{
- _queue.Start();
- _consumer.Wait();
+ Queue.Start();
+ Consumer.Wait();
}
finally
{
- _queue.Stop();
+ Queue.Stop();
}
- Assert.That(_consumer.HandledMessages.ContainsSingle<TestMessage>());
+ Assert.That(Consumer.HandledMessages.ContainsSingle<TestMessage>());
}
[Test]
public void multiple_messages_and_then_starting_messages_should_be_forwarded_to_bus()
{
- _consumer.SetWaitingCount(3);
+ Consumer.SetWaitingCount(3);
- _queue.Publish(new TestMessage());
- _queue.Publish(new TestMessage2());
- _queue.Publish(new TestMessage3());
+ Queue.Publish(new TestMessage());
+ Queue.Publish(new TestMessage2());
+ Queue.Publish(new TestMessage3());
try
{
- _queue.Start();
- _consumer.Wait();
+ Queue.Start();
+ Consumer.Wait();
}
finally
{
- _queue.Stop();
+ Queue.Stop();
}
- Assert.That(_consumer.HandledMessages.ContainsSingle<TestMessage>() &&
- _consumer.HandledMessages.ContainsSingle<TestMessage2>() &&
- _consumer.HandledMessages.ContainsSingle<TestMessage3>());
+ Assert.That(Consumer.HandledMessages.ContainsSingle<TestMessage>() &&
+ Consumer.HandledMessages.ContainsSingle<TestMessage2>() &&
+ Consumer.HandledMessages.ContainsSingle<TestMessage3>());
}
}
}
View
19 ....Tests/Bus/QueuedHandler/when_starting.cs → ...Tests/Bus/when_starting_queued_handler.cs
@@ -25,44 +25,45 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
+
using System;
-using EventStore.Core.Tests.Bus.QueuedHandler.Helpers;
+using EventStore.Core.Tests.Bus.Helpers;
using NUnit.Framework;
-namespace EventStore.Core.Tests.Bus.QueuedHandler
+namespace EventStore.Core.Tests.Bus
{
[TestFixture]
- public class when_starting : QueuedHandlerTestWithStupidConsumer
+ public class when_starting_queued_handler : QueuedHandlerTestWithNoopConsumer
{
public override void SetUp()
{
base.SetUp();
- _queue.Start();
+ Queue.Start();
}
public override void TearDown()
{
- _queue.Stop();
+ Queue.Stop();
base.TearDown();
}
[Test]
public void gracefully_should_not_throw()
{
- Assert.Throws<InvalidOperationException>(() => _queue.Start());
+ Assert.Throws<InvalidOperationException>(() => Queue.Start());
}
[Test]
public void multiple_times_should_throw()
{
- Assert.Throws<InvalidOperationException>(() => _queue.Start());
+ Assert.Throws<InvalidOperationException>(() => Queue.Start());
}
[Test]
public void after_being_stopped_should_throw()
{
- _queue.Stop();
- Assert.Throws<InvalidOperationException>(() => _queue.Start());
+ Queue.Stop();
+ Assert.Throws<InvalidOperationException>(() => Queue.Start());
}
}
}
View
50 ....Tests/Bus/QueuedHandler/when_stopping.cs → ...Tests/Bus/when_stopping_queued_handler.cs
@@ -25,63 +25,71 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
+
using System;
using System.Threading;
+using EventStore.Core.Bus;
using EventStore.Core.Tests.Bus.Helpers;
-using EventStore.Core.Tests.Bus.QueuedHandler.Helpers;
using NUnit.Framework;
-namespace EventStore.Core.Tests.Bus.QueuedHandler
+namespace EventStore.Core.Tests.Bus
{
[TestFixture]
- public class when_stopping : QueuedHandlerTestWithStupidConsumer
+ public class when_stopping_queued_handler : QueuedHandlerTestWithNoopConsumer
{
[Test]
public void gracefully_should_not_throw()
{
- _queue.Start();
- Assert.DoesNotThrow(() => _queue.Stop());
+ Queue.Start();
+ Assert.DoesNotThrow(() => Queue.Stop());
}
[Test]
public void gracefully_and_queue_is_not_busy_should_not_take_much_time()
{
- _queue.Start();
+ Queue.Start();
- Action onSuccess = Assert.Pass;
- Action onTimeout = () => Assert.Fail("couldn't stop queue in time");
+ var wait = new ManualResetEventSlim(false);
+
+ ThreadPool.QueueUserWorkItem(_ =>
+ {
+ Queue.Stop();
+ wait.Set();
+ });
- TimeoutHelper.CallWithTimeout(() => _queue.Stop(), 100, onSuccess, onTimeout);
+ Assert.IsTrue(wait.Wait(100), "Couldn't stop queue in time.");
}
[Test]
public void second_time_should_not_throw()
{
- _queue.Start();
- _queue.Stop();
- Assert.DoesNotThrow(() => _queue.Stop());
+ Queue.Start();
+ Queue.Stop();
+ Assert.DoesNotThrow(() => Queue.Stop());
}
[Test]
public void second_time_should_not_take_much_time()
{
- _queue.Start();
- _queue.Stop();
+ Queue.Start();
+ Queue.Stop();
- Action onSuccess = Assert.Pass;
- Action onTimeout = () => Assert.Fail("couldn't stop queue in time");
+ var wait = new ManualResetEventSlim(false);
+
+ ThreadPool.QueueUserWorkItem(_ =>
+ {
+ Queue.Stop();
+ wait.Set();
+ });
- TimeoutHelper.CallWithTimeout(() => _queue.Stop(), 10, onSuccess, onTimeout);
+ Assert.IsTrue(wait.Wait(10), "Couldn't stop queue in time.");
}
[Test]
public void while_queue_is_busy_should_crash_with_timeout()
{
var consumer = new WaitingConsumer(1);
- var busyQueue = new Core.Bus.QueuedHandler(consumer,
- "busy_test_queue",
- watchSlowMsg: false,
- threadStopWaitTimeoutMs: 100);
+ var busyQueue = new QueuedHandler(consumer, "busy_test_queue", watchSlowMsg: false, threadStopWaitTimeoutMs: 100);
var waitHandle = new ManualResetEvent(false);
var handledEvent = new ManualResetEvent(false);
try
View
130 ...tStore.Core.Tests/Bus/when_subscribing.cs → ...sts/Bus/when_subscribing_to_memory_bus.cs
@@ -26,55 +26,69 @@
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
using System;
-using EventStore.Core.Messaging;
+using EventStore.Core.Bus;
using EventStore.Core.Tests.Bus.Helpers;
using EventStore.Core.Tests.Common;
using NUnit.Framework;
namespace EventStore.Core.Tests.Bus
{
[TestFixture]
- public class when_subscribing : BusTestBase
+ public class when_subscribing_to_memory_bus
{
+ private InMemoryBus _bus;
+
+ [SetUp]
+ public void SetUp()
+ {
+ _bus = new InMemoryBus("test_bus", watchSlowMsg: false);
+ }
+
+ [TearDown]
+ public void TearDown()
+ {
+ _bus = null;
+ }
+
[Test]
public void null_as_handler_app_should_throw_arg_null_exception()
{
- Assert.Throws<ArgumentNullException>(() => Bus.Subscribe<TestMessage>(null));
+ Assert.Throws<ArgumentNullException>(() => _bus.Subscribe<TestMessage>(null));
}
[Test]
public void but_not_publishing_messages_noone_should_handle_any_messages()
{
- var multiHandler = new MultipleMessagesTestHandler();
- Bus.Subscribe<TestMessage>(multiHandler);
- Bus.Subscribe<TestMessage2>(multiHandler);
- Bus.Subscribe<TestMessage3>(multiHandler);
+ var multiHandler = new TestMultiHandler();
+ _bus.Subscribe<TestMessage>(multiHandler);
+ _bus.Subscribe<TestMessage2>(multiHandler);
+ _bus.Subscribe<TestMessage3>(multiHandler);
- Assert.That(multiHandler.DidntHandleAnyMessages());
+ Assert.That(multiHandler.HandledMessages.Count == 0);
}
[Test]
public void one_handler_to_one_message_it_should_be_handled()
{
- var handler = new TestHandler();
- Bus.Subscribe<TestMessage>(handler);
+ var handler = new TestHandler<TestMessage>();
+ _bus.Subscribe<TestMessage>(handler);
- Bus.Publish(new TestMessage());
+ _bus.Publish(new TestMessage());
Assert.That(handler.HandledMessages.ContainsSingle<TestMessage>());
}
[Test]
public void one_handler_to_multiple_messages_they_all_should_be_handled()
{
- var multiHandler = new MultipleMessagesTestHandler();
- Bus.Subscribe<TestMessage>(multiHandler);
- Bus.Subscribe<TestMessage2>(multiHandler);
- Bus.Subscribe<TestMessage3>(multiHandler);
+ var multiHandler = new TestMultiHandler();
+ _bus.Subscribe<TestMessage>(multiHandler);
+ _bus.Subscribe<TestMessage2>(multiHandler);
+ _bus.Subscribe<TestMessage3>(multiHandler);
- Bus.Publish(new TestMessage());
- Bus.Publish(new TestMessage2());
- Bus.Publish(new TestMessage3());
+ _bus.Publish(new TestMessage());
+ _bus.Publish(new TestMessage2());
+ _bus.Publish(new TestMessage3());
Assert.That(multiHandler.HandledMessages.ContainsSingle<TestMessage>() &&
multiHandler.HandledMessages.ContainsSingle<TestMessage2>() &&
@@ -84,13 +98,13 @@ public void one_handler_to_multiple_messages_they_all_should_be_handled()
[Test]
public void one_handler_to_few_messages_then_only_subscribed_should_be_handled()
{
- var multiHandler = new MultipleMessagesTestHandler();
- Bus.Subscribe<TestMessage>(multiHandler);
- Bus.Subscribe<TestMessage3>(multiHandler);
+ var multiHandler = new TestMultiHandler();
+ _bus.Subscribe<TestMessage>(multiHandler);
+ _bus.Subscribe<TestMessage3>(multiHandler);
- Bus.Publish(new TestMessage());
- Bus.Publish(new TestMessage2());
- Bus.Publish(new TestMessage3());
+ _bus.Publish(new TestMessage());
+ _bus.Publish(new TestMessage2());
+ _bus.Publish(new TestMessage3());
Assert.That(multiHandler.HandledMessages.ContainsSingle<TestMessage>() &&
multiHandler.HandledMessages.ContainsNo<TestMessage2>() &&
@@ -100,13 +114,13 @@ public void one_handler_to_few_messages_then_only_subscribed_should_be_handled()
[Test]
public void multiple_handlers_to_one_message_then_each_handler_should_handle_message_once()
{
- var handler1 = new SameMessageHandler1();
- var handler2 = new SameMessageHandler2();
+ var handler1 = new TestHandler<TestMessage>();
+ var handler2 = new TestHandler<TestMessage>();
- Bus.Subscribe<TestMessage>(handler1);
- Bus.Subscribe<TestMessage>(handler2);
+ _bus.Subscribe<TestMessage>(handler1);
+ _bus.Subscribe<TestMessage>(handler2);
- Bus.Publish(new TestMessage());
+ _bus.Publish(new TestMessage());
Assert.That(handler1.HandledMessages.ContainsSingle<TestMessage>());
Assert.That(handler2.HandledMessages.ContainsSingle<TestMessage>());
@@ -115,22 +129,22 @@ public void multiple_handlers_to_one_message_then_each_handler_should_handle_mes
[Test]
public void multiple_handlers_to_multiple_messages_then_each_handler_should_handle_subscribed_messages()
{
- var handler1 = new MultipleMessagesTestHandler();
- var handler2 = new MultipleMessagesTestHandler();
- var handler3 = new MultipleMessagesTestHandler();
+ var handler1 = new TestMultiHandler();
+ var handler2 = new TestMultiHandler();
+ var handler3 = new TestMultiHandler();
- Bus.Subscribe<TestMessage>(handler1);
- Bus.Subscribe<TestMessage3>(handler1);
+ _bus.Subscribe<TestMessage>(handler1);
+ _bus.Subscribe<TestMessage3>(handler1);
- Bus.Subscribe<TestMessage>(handler2);
- Bus.Subscribe<TestMessage2>(handler2);
+ _bus.Subscribe<TestMessage>(handler2);
+ _bus.Subscribe<TestMessage2>(handler2);
- Bus.Subscribe<TestMessage2>(handler3);
- Bus.Subscribe<TestMessage3>(handler3);
+ _bus.Subscribe<TestMessage2>(handler3);
+ _bus.Subscribe<TestMessage3>(handler3);
- Bus.Publish(new TestMessage());
- Bus.Publish(new TestMessage2());
- Bus.Publish(new TestMessage3());
+ _bus.Publish(new TestMessage());
+ _bus.Publish(new TestMessage2());
+ _bus.Publish(new TestMessage3());
Assert.That(handler1.HandledMessages.ContainsSingle<TestMessage>() &&
handler1.HandledMessages.ContainsSingle<TestMessage3>() &&
@@ -145,23 +159,23 @@ public void multiple_handlers_to_multiple_messages_then_each_handler_should_hand
[Test]
public void multiple_handlers_to_multiple_messages_then_each_handler_should_handle_only_subscribed_messages()
{
- var handler1 = new MultipleMessagesTestHandler();
- var handler2 = new MultipleMessagesTestHandler();
- var handler3 = new MultipleMessagesTestHandler();
+ var handler1 = new TestMultiHandler();
+ var handler2 = new TestMultiHandler();
+ var handler3 = new TestMultiHandler();
- Bus.Subscribe<TestMessage>(handler1);
- Bus.Subscribe<TestMessage3>(handler1);
+ _bus.Subscribe<TestMessage>(handler1);
+ _bus.Subscribe<TestMessage3>(handler1);
- Bus.Subscribe<TestMessage>(handler2);
- Bus.Subscribe<TestMessage2>(handler2);
+ _bus.Subscribe<TestMessage>(handler2);
+ _bus.Subscribe<TestMessage2>(handler2);
- Bus.Subscribe<TestMessage2>(handler3);
- Bus.Subscribe<TestMessage3>(handler3);
+ _bus.Subscribe<TestMessage2>(handler3);
+ _bus.Subscribe<TestMessage3>(handler3);
- Bus.Publish(new TestMessage());
- Bus.Publish(new TestMessage2());
- Bus.Publish(new TestMessage3());
+ _bus.Publish(new TestMessage());
+ _bus.Publish(new TestMessage2());
+ _bus.Publish(new TestMessage3());
Assert.That(handler1.HandledMessages.ContainsSingle<TestMessage>() &&
@@ -180,12 +194,12 @@ public void multiple_handlers_to_multiple_messages_then_each_handler_should_hand
[Test]
public void same_handler_to_same_message_few_times_then_message_should_be_handled_only_once()
{
- var handler = new TestHandler();
- Bus.Subscribe<TestMessage>(handler);
- Bus.Subscribe<TestMessage>(handler);
- Bus.Subscribe<TestMessage>(handler);
+ var handler = new TestHandler<TestMessage>();
+ _bus.Subscribe<TestMessage>(handler);
+ _bus.Subscribe<TestMessage>(handler);
+ _bus.Subscribe<TestMessage>(handler);
- Bus.Publish(new TestMessage());
+ _bus.Publish(new TestMessage());
Assert.That(handler.HandledMessages.ContainsSingle<TestMessage>());
}
View
143 ...tore.Core.Tests/Bus/when_unsubscribing.cs → ...Bus/when_unsubscribing_from_memory_bus.cs
@@ -26,78 +26,93 @@
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
using System;
+using EventStore.Core.Bus;
using EventStore.Core.Tests.Bus.Helpers;
using EventStore.Core.Tests.Common;
using NUnit.Framework;
namespace EventStore.Core.Tests.Bus
{
[TestFixture]
- public class when_unsubscribing : BusTestBase
+ public class when_unsubscribing_from_memory_bus
{
+ private InMemoryBus _bus;
+
+ [SetUp]
+ public void SetUp()
+ {
+ _bus = new InMemoryBus("test_bus", watchSlowMsg: false);
+ }
+
+ [TearDown]
+ public void TearDown()
+ {
+ _bus = null;
+ }
+
[Test]
public void null_as_handler_app_should_throw()
{
- Assert.Throws<ArgumentNullException>(() => Bus.Unsubscribe<TestMessage>(null));
+ Assert.Throws<ArgumentNullException>(() => _bus.Unsubscribe<TestMessage>(null));
}
[Test]
public void not_subscribed_handler_app_doesnt_throw()
{
- var handler = new TestHandler();
- Assert.DoesNotThrow(() => Bus.Unsubscribe<TestMessage>(handler));
+ var handler = new TestHandler<TestMessage>();
+ Assert.DoesNotThrow(() => _bus.Unsubscribe<TestMessage>(handler));
}
[Test]
public void same_handler_from_same_message_multiple_times_app_doesnt_throw()
{
- var handler = new TestHandler();
+ var handler = new TestHandler<TestMessage>();
Assert.DoesNotThrow(() =>
{
- Bus.Unsubscribe<TestMessage>(handler);
- Bus.Unsubscribe<TestMessage>(handler);
- Bus.Unsubscribe<TestMessage>(handler);
+ _bus.Unsubscribe<TestMessage>(handler);
+ _bus.Unsubscribe<TestMessage>(handler);
+ _bus.Unsubscribe<TestMessage>(handler);
});
}
[Test]
public void multihandler_from_single_message_app_doesnt_throw()
{
- var handler = new MultipleMessagesTestHandler();
- Bus.Subscribe<TestMessage>(handler);
- Bus.Subscribe<TestMessage2>(handler);
- Bus.Subscribe<TestMessage3>(handler);
+ var handler = new TestMultiHandler();
+ _bus.Subscribe<TestMessage>(handler);
+ _bus.Subscribe<TestMessage2>(handler);
+ _bus.Subscribe<TestMessage3>(handler);
- Assert.DoesNotThrow(() => Bus.Unsubscribe<TestMessage>(handler));
+ Assert.DoesNotThrow(() => _bus.Unsubscribe<TestMessage>(handler));
}
[Test]
public void handler_from_message_it_should_not_handle_this_message_anymore()
{
- var handler = new TestHandler();
- Bus.Subscribe<TestMessage>(handler);
+ var handler = new TestHandler<TestMessage>();
+ _bus.Subscribe<TestMessage>(handler);
- Bus.Unsubscribe<TestMessage>(handler);
- Bus.Publish(new TestMessage());
+ _bus.Unsubscribe<TestMessage>(handler);
+ _bus.Publish(new TestMessage());
Assert.That(handler.HandledMessages.IsEmpty());
}
[Test]
public void handler_from_multiple_messages_they_all_should_not_be_handled_anymore()
{
- var handler = new MultipleMessagesTestHandler();
- Bus.Subscribe<TestMessage>(handler);
- Bus.Subscribe<TestMessage2>(handler);
- Bus.Subscribe<TestMessage3>(handler);
+ var handler = new TestMultiHandler();
+ _bus.Subscribe<TestMessage>(handler);
+ _bus.Subscribe<TestMessage2>(handler);
+ _bus.Subscribe<TestMessage3>(handler);
- Bus.Unsubscribe<TestMessage>(handler);
- Bus.Unsubscribe<TestMessage2>(handler);
- Bus.Unsubscribe<TestMessage3>(handler);
+ _bus.Unsubscribe<TestMessage>(handler);
+ _bus.Unsubscribe<TestMessage2>(handler);
+ _bus.Unsubscribe<TestMessage3>(handler);
- Bus.Publish(new TestMessage());
- Bus.Publish(new TestMessage2());
- Bus.Publish(new TestMessage3());
+ _bus.Publish(new TestMessage());
+ _bus.Publish(new TestMessage2());
+ _bus.Publish(new TestMessage3());
Assert.That(handler.HandledMessages.ContainsNo<TestMessage>() &&
handler.HandledMessages.ContainsNo<TestMessage2>() &&
@@ -107,16 +122,16 @@ public void handler_from_multiple_messages_they_all_should_not_be_handled_anymor
[Test]
public void handler_from_message_it_should_not_handle_this_message_anymore_and_still_handle_other_messages()
{
- var handler = new MultipleMessagesTestHandler();
- Bus.Subscribe<TestMessage>(handler);
- Bus.Subscribe<TestMessage2>(handler);
- Bus.Subscribe<TestMessage3>(handler);
+ var handler = new TestMultiHandler();
+ _bus.Subscribe<TestMessage>(handler);
+ _bus.Subscribe<TestMessage2>(handler);
+ _bus.Subscribe<TestMessage3>(handler);
- Bus.Unsubscribe<TestMessage>(handler);
+ _bus.Unsubscribe<TestMessage>(handler);
- Bus.Publish(new TestMessage());
- Bus.Publish(new TestMessage2());
- Bus.Publish(new TestMessage3());
+ _bus.Publish(new TestMessage());
+ _bus.Publish(new TestMessage2());
+ _bus.Publish(new TestMessage3());
Assert.That(handler.HandledMessages.ContainsNo<TestMessage>() &&
handler.HandledMessages.ContainsSingle<TestMessage2>() &&
@@ -126,16 +141,16 @@ public void handler_from_message_it_should_not_handle_this_message_anymore_and_s
[Test]
public void one_handler_and_leaving_others_subscribed_only_others_should_handle_message()
{
- var handler1 = new TestHandler();
- var handler2 = new TestHandler();
- var handler3 = new TestHandler();
+ var handler1 = new TestHandler<TestMessage>();
+ var handler2 = new TestHandler<TestMessage>();
+ var handler3 = new TestHandler<TestMessage>();
- Bus.Subscribe<TestMessage>(handler1);
- Bus.Subscribe<TestMessage>(handler2);
- Bus.Subscribe<TestMessage>(handler3);
+ _bus.Subscribe<TestMessage>(handler1);
+ _bus.Subscribe<TestMessage>(handler2);
+ _bus.Subscribe<TestMessage>(handler3);
- Bus.Unsubscribe(handler1);
- Bus.Publish(new TestMessage());
+ _bus.Unsubscribe(handler1);
+ _bus.Publish(new TestMessage());
Assert.That(handler1.HandledMessages.ContainsNo<TestMessage>() &&
handler2.HandledMessages.ContainsSingle<TestMessage>() &&
@@ -146,18 +161,18 @@ public void one_handler_and_leaving_others_subscribed_only_others_should_handle_
[Test]
public void all_handlers_from_message_noone_should_handle_message()
{
- var handler1 = new TestHandler();
- var handler2 = new TestHandler();
- var handler3 = new TestHandler();
+ var handler1 = new TestHandler<TestMessage>();
+ var handler2 = new TestHandler<TestMessage>();
+ var handler3 = new TestHandler<TestMessage>();
- Bus.Subscribe<TestMessage>(handler1);
- Bus.Subscribe<TestMessage>(handler2);
- Bus.Subscribe<TestMessage>(handler3);
+ _bus.Subscribe<TestMessage>(handler1);
+ _bus.Subscribe<TestMessage>(handler2);
+ _bus.Subscribe<TestMessage>(handler3);
- Bus.Unsubscribe(handler1);
- Bus.Unsubscribe(handler2);
- Bus.Unsubscribe(handler3);
- Bus.Publish(new TestMessage());
+ _bus.Unsubscribe(handler1);
+ _bus.Unsubscribe(handler2);
+ _bus.Unsubscribe(handler3);
+ _bus.Publish(new TestMessage());
Assert.That(handler1.HandledMessages.ContainsNo<TestMessage>() &&
handler2.HandledMessages.ContainsNo<TestMessage>() &&
@@ -167,15 +182,15 @@ public void all_handlers_from_message_noone_should_handle_message()
[Test]
public void handlers_after_publishing_message_all_is_still_done_correctly()
{
- var handler1 = new TestHandler();
- var handler2 = new TestHandler();
- var handler3 = new TestHandler();
+ var handler1 = new TestHandler<TestMessage>();
+ var handler2 = new TestHandler<TestMessage>();
+ var handler3 = new TestHandler<TestMessage>();
- Bus.Subscribe<TestMessage>(handler1);
- Bus.Subscribe<TestMessage>(handler2);
- Bus.Subscribe<TestMessage>(handler3);
+ _bus.Subscribe<TestMessage>(handler1);
+ _bus.Subscribe<TestMessage>(handler2);
+ _bus.Subscribe<TestMessage>(handler3);
- Bus.Publish(new TestMessage());
+ _bus.Publish(new TestMessage());
handler1.HandledMessages.Clear();
handler2.HandledMessages.Clear();
handler3.HandledMessages.Clear();
@@ -185,10 +200,10 @@ public void handlers_after_publishing_message_all_is_still_done_correctly()
handler2.HandledMessages.ContainsNo<TestMessage>() &&
handler3.HandledMessages.ContainsNo<TestMessage>());
- Bus.Unsubscribe(handler1);
- Bus.Unsubscribe(handler2);
- Bus.Unsubscribe(handler3);
- Bus.Publish(new TestMessage());
+ _bus.Unsubscribe(handler1);
+ _bus.Unsubscribe(handler2);
+ _bus.Unsubscribe(handler3);
+ _bus.Publish(new TestMessage());
Assert.That(handler1.HandledMessages.ContainsNo<TestMessage>() &&
handler2.HandledMessages.ContainsNo<TestMessage>() &&
View
28 src/EventStore/EventStore.Core.Tests/EventStore.Core.Tests.csproj
@@ -66,21 +66,23 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
- <Compile Include="Bus\Helpers\BusTestBase.cs" />
<Compile Include="Bus\Helpers\BusTestMessages.cs" />
- <Compile Include="Bus\Helpers\BusTestHandlers.cs" />
+ <Compile Include="Bus\Helpers\NoopConsumer.cs" />
+ <Compile Include="Bus\Helpers\QueuedHandlerTestWithWaitingConsumer.cs" />
+ <Compile Include="Bus\Helpers\TestMultiHandler.cs" />
+ <Compile Include="Bus\Helpers\TestHandler.cs" />
<Compile Include="Bus\Helpers\TimeoutHelper.cs" />
- <Compile Include="Bus\QueuedHandler\Helpers\Consumers.cs" />
- <Compile Include="Bus\QueuedHandler\Helpers\Messages.cs" />
- <Compile Include="Bus\QueuedHandler\Helpers\QueuedHandlerTestBase.cs" />
- <Compile Include="Bus\QueuedHandler\queued_handler_should.cs" />
- <Compile Include="Bus\QueuedHandler\when_publishing.cs" />
- <Compile Include="Bus\QueuedHandler\when_publishing_before_starting.cs" />
- <Compile Include="Bus\QueuedHandler\when_starting.cs" />
- <Compile Include="Bus\QueuedHandler\when_stopping.cs" />
- <Compile Include="Bus\when_publishing.cs" />
- <Compile Include="Bus\when_subscribing.cs" />
- <Compile Include="Bus\when_unsubscribing.cs" />
+ <Compile Include="Bus\Helpers\WaitingConsumer.cs" />
+ <Compile Include="Bus\Helpers\Messages.cs" />
+ <Compile Include="Bus\Helpers\QueuedHandlerTestWithNoopConsumer.cs" />
+ <Compile Include="Bus\queued_handler_should.cs" />
+ <Compile Include="Bus\when_publishing_to_queued_handler.cs" />
+ <Compile Include="Bus\when_publishing_to_queued_handler_before_starting.cs" />
+ <Compile Include="Bus\when_starting_queued_handler.cs" />
+ <Compile Include="Bus\when_stopping_queued_handler.cs" />
+ <Compile Include="Bus\when_publishing_into_memory_bus.cs" />
+ <Compile Include="Bus\when_subscribing_to_memory_bus.cs" />
+ <Compile Include="Bus\when_unsubscribing_from_memory_bus.cs" />
<Compile Include="ClientAPI\AllEvents\read_all_events_backward_should.cs" />
<Compile Include="ClientAPI\AllEvents\read_all_events_forward_should.cs" />
<Compile Include="ClientAPI\AllEvents\subscribe_to_all_should.cs" />
View
1 src/EventStore/EventStore.Core.Tests/Services/Storage/ReadIndexTestScenario.cs
@@ -93,7 +93,6 @@ public override void TestFixtureSetUp()
TableIndex = new TableIndex(Path.Combine(PathName, "index"),
() => new HashListMemTable(MaxEntriesInMemTable * 2),
MaxEntriesInMemTable);
- TableIndex.Initialize();
var reader = new TFChunkReader(Db, Db.Config.WriterCheckpoint);
ReadIndex = new ReadIndex(new NoopPublisher(),
View
9 ...ervices/Storage/Transactions/when_rebuilding_index_for_partially_persisted_transaction.cs
@@ -40,7 +40,7 @@
namespace EventStore.Core.Tests.Services.Storage.Transactions
{
- [TestFixture, Ignore]
+ [TestFixture]
public class when_rebuilding_index_for_partially_persisted_transaction : ReadIndexTestScenario
{
public when_rebuilding_index_for_partially_persisted_transaction(): base(maxEntriesInMemTable: 10)
@@ -58,7 +58,6 @@ public override void TestFixtureSetUp()
TableIndex = new TableIndex(Path.Combine(PathName, "index"),
() => new HashListMemTable(maxSize: 2000),
maxSizeForMemory: MaxEntriesInMemTable);
- TableIndex.Initialize();
ReadIndex = new ReadIndex(new NoopPublisher(),
2,
@@ -70,12 +69,6 @@ public override void TestFixtureSetUp()
ReadIndex.Build();
}
- public override void TestFixtureTearDown()
- {
- Thread.Sleep(500); // give chance to IndexMap to dump files
- base.TestFixtureTearDown();
- }
-
protected override void WriteTestScenario()
{
var begin = WriteTransactionBegin("ES", ExpectedVersion.Any);
View
7 src/EventStore/EventStore.Core/Bus/InMemoryBus.cs
@@ -102,8 +102,6 @@ public void Publish(Message message)
private void DispatchByType(Message message)
{
- //Log.Debug("{0}: dispatching {1} message.", Name, message);
-
var type = message.GetType();
PublishByType(message, type);
do
@@ -118,8 +116,9 @@ private void PublishByType(Message message, Type type)
List<IMessageHandler> handlers;
if (_typeHash.TryGetValue(type, out handlers))
{
- foreach (var handler in handlers)
+ for (int i = 0, n = handlers.Count; i < n; ++i)
{
+ var handler = handlers[i];
if (_watchSlowMsg)
{
_slowMsgWatch.Restart();
@@ -136,7 +135,9 @@ private void PublishByType(Message message, Type type)
}
}
else
+ {
handler.TryHandle(message);
+ }
}
}
}
View
6 src/EventStore/EventStore.Core/Bus/QueuedHandler.cs
@@ -63,7 +63,7 @@ public class QueuedHandler : IHandle<Message>, IPublisher, IMonitoredQueue, IThr
private Thread _thread;
private volatile bool _stop;
- private readonly ManualResetEvent _stopped = new ManualResetEvent(false);
+ private readonly ManualResetEventSlim _stopped = new ManualResetEventSlim(true);
private readonly int _threadStopWaitTimeoutMs;
// monitoring
@@ -117,12 +117,14 @@ public void Start()
_thread.IsBackground = true;
_thread.Name = _name;
_thread.Start();
+
+ _stopped.Reset();
}
public void Stop()
{
_stop = true;
- if (!_stopped.WaitOne(_threadStopWaitTimeoutMs))
+ if (!_stopped.Wait(_threadStopWaitTimeoutMs))
throw new TimeoutException(string.Format("Unable to stop thread '{0}'.", _name));
_queueMonitor.Unregister(this);
}
View
24 src/EventStore/EventStore.Core/Index/TableIndex.cs
@@ -62,7 +62,9 @@ public class TableIndex : ITableIndex
private long _prepareCheckpoint = -1;
private volatile bool _backgroundRunning;
- private readonly ManualResetEvent _backgroundRunningEvent = new ManualResetEvent(true);
+ private readonly ManualResetEventSlim _backgroundRunningEvent = new ManualResetEventSlim(true);
+
+ private bool _initialized;
public TableIndex(string directory,
Func<IMemTable> memTableFactory,
@@ -87,6 +89,12 @@ public class TableIndex : ITableIndex
public void Initialize()
{
//NOT THREAD SAFE (assumes one thread)
+
+ if (_initialized)
+ throw new IOException("TableIndex is already initialized.");
+
+ _initialized = true;
+
CreateIfDoesNotExist(_directory);
try
@@ -163,6 +171,7 @@ public void Add(long commitPos, uint stream, int version, long position)
_awaitingMemTables = newTables;
if (!_backgroundRunning)
{
+ _backgroundRunningEvent.Reset();
_backgroundRunning = true;
ThreadPool.QueueUserWorkItem(x => ReadOffQueue());
}
@@ -175,7 +184,6 @@ public void Add(long commitPos, uint stream, int version, long position)
private void ReadOffQueue()
{
- _backgroundRunningEvent.Reset();
try
{
while (true)
@@ -188,6 +196,7 @@ private void ReadOffQueue()
if (_awaitingMemTables.Count == 1)
{
_backgroundRunning = false;
+ _backgroundRunningEvent.Set();
return;
}
tableItem = _awaitingMemTables[_awaitingMemTables.Count - 1];
@@ -240,11 +249,6 @@ private void ReadOffQueue()
Log.ErrorException(exc, "Error in TableIndex.ReadOffQueue");
throw;
}
- finally
- {
- _backgroundRunning = false;
- _backgroundRunningEvent.Set();
- }
}
private void ReclaimMemoryIfNeeded(List<TableItem> awaitingMemTables)
@@ -442,11 +446,11 @@ private static int GetMaxOf(List<IEnumerator<IndexEntry>> enumerators)
public void ClearAll(bool removeFiles = true)
{
- _awaitingMemTables = new List<TableItem> { new TableItem(_memTableFactory(), -1, -1) };
-
- _backgroundRunningEvent.WaitOne(1000);
//this should also make sure that no background tasks are running anymore
+ if (!_backgroundRunningEvent.Wait(1000))
+ throw new TimeoutException("Could not finish background thread in reasonable time.");
+
if (_indexMap != null)
{
if (removeFiles)
View
3 src/EventStore/EventStore.Core/Messages/HttpClientMessageDto.cs
@@ -38,6 +38,7 @@ public static class HttpClientMessageDto
{
#region HTTP DTO
+ [XmlType(TypeName = "event")]
public class ClientEventText
{
public Guid EventId { get; set; }
@@ -75,6 +76,7 @@ public ClientEventText(Guid eventId, string eventType, byte[] data, byte[] metaD
}
}
+ [XmlRoot(ElementName = "write-events")]
public class WriteEventsText
{
public int ExpectedVersion { get; set; }
@@ -94,6 +96,7 @@ public WriteEventsText(int expectedVersion, ClientEventText[] events)
}
}
+ [XmlRoot(ElementName = "read-event-result")]
public class ReadEventCompletedText
{
public string EventStreamId { get; set; }
View
14 src/EventStore/EventStore.Core/Services/Storage/StorageWriter.cs
@@ -431,23 +431,19 @@ private bool ShouldCreateStreamFor(StorageMessage.IPreconditionedWriteMessage me
protected bool Flush()
{
- if (ShouldForceFlush())
+ var start = _watch.ElapsedTicks;
+ if (start - _lastFlush >= _flushDelay + 2 * MsPerTick || FlushMessagesInQueue == 0)
{
- var start = _watch.ElapsedTicks;
Writer.Flush();
- _flushDelay = _watch.ElapsedTicks - start;
- _lastFlush = _watch.ElapsedTicks;
+ var end = _watch.ElapsedTicks;
+ _flushDelay = end - start;
+ _lastFlush = end;
return true;
}
return false;
}
- private bool ShouldForceFlush()
- {
- return _watch.ElapsedTicks - _lastFlush >= _flushDelay + 2 * MsPerTick || FlushMessagesInQueue == 0;
- }
-
public void Dispose()
{
Writer.Flush();
View
29 src/EventStore/EventStore.Core/Services/Transport/Http/AutoEventConverter.cs
@@ -101,17 +101,30 @@ private static HttpClientMessageDto.WriteEventsText LoadFromXml(string xml)
{
try
{
- xml = xml.Replace("<Events>", string.Empty).Replace("</Events>", string.Empty)
- .Replace("<write-events>", "<write-events xmlns:json='http://james.newtonking.com/projects/json'>")
- .Replace("<event>", "<Events json:Array='true'>").Replace("</event>", "</Events>");
var doc = XDocument.Parse(xml);
- var version = doc.Descendants("ExpectedVersion").Single();
- version.Remove();
+ XNamespace jsonNs = "http://james.newtonking.com/projects/json";
+ XName jsonName = XNamespace.Xmlns + "json";
- var json = JsonConvert.SerializeXNode(doc, Formatting.None, true);
- var textEvents = JsonConvert.DeserializeObject<JObject>(json)["Events"].ToObject<HttpClientMessageDto.ClientEventText[]>();
- return new HttpClientMessageDto.WriteEventsText(int.Parse(version.Value), textEvents.ToArray());
+ doc.Root.SetAttributeValue(jsonName, jsonNs);
+
+ var expectedVersion = doc.Root.Element("ExpectedVersion");
+ var events = doc.Root.Descendants("event").ToArray();
+
+ foreach (var @event in events)
+ {
+ @event.Name = "Events";
+ @event.SetAttributeValue(jsonNs + "Array", "true");
+ }
+
+ doc.Root.ReplaceNodes(events);
+
+ foreach (var element in doc.Root.Descendants("Data").Concat(doc.Root.Descendants("Metadata")))
+ element.RemoveAttributes();
+
+ var json = JsonConvert.SerializeXNode(doc, Formatting.None, false);
+ var textEvents = JsonConvert.DeserializeObject<JObject>(json)["write-events"]["Events"].ToObject<HttpClientMessageDto.ClientEventText[]>();
+ return new HttpClientMessageDto.WriteEventsText(int.Parse(expectedVersion.Value), textEvents.ToArray());
}
catch (Exception e)
{
View
7 ...tStore/EventStore.Projections.Core.Tests/Services/TestFixtureWithProjectionCoreService.cs
@@ -30,7 +30,8 @@
using System.Collections.Generic;
using EventStore.Core.Bus;
using EventStore.Core.Data;
-using EventStore.Core.Tests.Bus.QueuedHandler.Helpers;
+using EventStore.Core.Messaging;
+using EventStore.Core.Tests.Bus.Helpers;
using EventStore.Core.TransactionLog.Checkpoint;
using EventStore.Projections.Core.Messages;
using EventStore.Projections.Core.Services;
@@ -88,14 +89,14 @@ public void Handle(CoreProjectionProcessingMessage.RestartRequested message)
}
}
- protected WatchingConsumer _consumer;
+ protected TestHandler<Message> _consumer;
protected InMemoryBus _bus;
protected ProjectionCoreService _service;
[SetUp]
public void Setup()
{
- _consumer = new WatchingConsumer();
+ _consumer = new TestHandler<Message>();
_bus = new InMemoryBus("temp");
_bus.Subscribe(_consumer);
_service = new ProjectionCoreService(_bus, _bus, 10, new InMemoryCheckpoint(1000));
View
8 ...entStore.Projections.Core.Tests/Services/core_projection/TestFixtureWithCoreProjection.cs
@@ -40,8 +40,8 @@ namespace EventStore.Projections.Core.Tests.Services.core_projection
public abstract class TestFixtureWithCoreProjection : TestFixtureWithExistingEvents
{
protected CoreProjection _coreProjection;
- protected TestMessageHandler<ProjectionSubscriptionManagement.Subscribe> _subscribeProjectionHandler;
- protected TestMessageHandler<ClientMessage.WriteEvents> _writeEventHandler;
+ protected TestHandler<ProjectionSubscriptionManagement.Subscribe> _subscribeProjectionHandler;
+ protected TestHandler<ClientMessage.WriteEvents> _writeEventHandler;
protected readonly string _lastSeenEvent = Guid.NewGuid().ToString("D");
protected Guid _firstWriteCorrelationId;
@@ -57,8 +57,8 @@ public abstract class TestFixtureWithCoreProjection : TestFixtureWithExistingEve
[SetUp]
public void setup()
{
- _subscribeProjectionHandler = new TestMessageHandler<ProjectionSubscriptionManagement.Subscribe>();
- _writeEventHandler = new TestMessageHandler<ClientMessage.WriteEvents>();
+ _subscribeProjectionHandler = new TestHandler<ProjectionSubscriptionManagement.Subscribe>();
+ _writeEventHandler = new TestHandler<ClientMessage.WriteEvents>();
_bus.Subscribe(_subscribeProjectionHandler);
_bus.Subscribe(_writeEventHandler);
View
5 ...entStore.Projections.Core.Tests/Services/core_projection/TestFixtureWithExistingEvents.cs
@@ -37,7 +37,6 @@
using EventStore.Core.Messaging;
using EventStore.Core.Services.Storage.ReaderIndex;
using EventStore.Core.Tests.Bus.Helpers;
-using EventStore.Core.Tests.Bus.QueuedHandler.Helpers;
using EventStore.Core.TransactionLog.LogRecords;
using EventStore.Projections.Core.Messages;
using EventStore.Projections.Core.Services;
@@ -51,7 +50,7 @@ public abstract class TestFixtureWithExistingEvents : TestFixtureWithReadWriteDi
IHandle<ClientMessage.WriteEvents>,
IHandle<ProjectionCoreServiceMessage.Tick>
{
- protected TestMessageHandler<ClientMessage.ReadStreamEventsBackward> _listEventsHandler;
+ protected TestHandler<ClientMessage.ReadStreamEventsBackward> _listEventsHandler;
protected readonly Dictionary<string, List<EventRecord>> _lastMessageReplies = new Dictionary<string, List<EventRecord>>();
@@ -120,7 +119,7 @@ public void setup1()
{
_ticksAreHandle