Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'dev' of github.com:/EventStore/EventStore into dev

  • Loading branch information...
commit 8a7c6eff7db6847a096237e8981aca4b4f5f60e4 2 parents b5c2eab + 5ad7572
@kpyatkivskyy kpyatkivskyy authored
Showing with 816 additions and 359 deletions.
  1. +49 −0 src/EventStore/EventStore.ClientAPI/Common/Utils/Tasks.cs
  2. +178 −0 src/EventStore/EventStore.ClientAPI/Connection/ClusterExplorer.cs
  3. +5 −39 src/EventStore/EventStore.ClientAPI/{Configure.cs → ConnectionSettings.cs}
  4. +4 −2 src/EventStore/EventStore.ClientAPI/EventStore.ClientAPI.csproj
  5. +0 −100 src/EventStore/EventStore.ClientAPI/EventStore.cs
  6. +133 −33 src/EventStore/EventStore.ClientAPI/EventStoreConnection.cs
  7. +83 −0 src/EventStore/EventStore.ClientAPI/Messages/ClusterMessages.cs
  8. +20 −10 src/EventStore/EventStore.Core.Tests/ClientAPI/AllEvents/read_all_events_backward_should.cs
  9. +23 −12 src/EventStore/EventStore.Core.Tests/ClientAPI/AllEvents/read_all_events_forward_should.cs
  10. +7 −4 src/EventStore/EventStore.Core.Tests/ClientAPI/AllEvents/subscribe_to_all_should.cs
  11. +20 −10 src/EventStore/EventStore.Core.Tests/ClientAPI/append_to_stream.cs
  12. +8 −4 src/EventStore/EventStore.Core.Tests/ClientAPI/creating_stream.cs
  13. +10 −5 src/EventStore/EventStore.Core.Tests/ClientAPI/deleting_stream.cs
  14. +20 −10 src/EventStore/EventStore.Core.Tests/ClientAPI/read_event_stream_backward_should.cs
  15. +24 −12 src/EventStore/EventStore.Core.Tests/ClientAPI/read_event_stream_forward_should.cs
  16. +13 −7 src/EventStore/EventStore.Core.Tests/ClientAPI/subscribe_should.cs
  17. +38 −15 src/EventStore/EventStore.Core.Tests/ClientAPI/transaction.cs
  18. +2 −2 src/EventStore/EventStore.Core.Tests/Index/MemTableTests.cs
  19. +1 −1  src/EventStore/EventStore.Core.Tests/Services/Storage/ReadIndexTestScenario.cs
  20. +1 −1  ...Store.Core.Tests/Services/Storage/Transactions/when_rebuilding_index_for_partially_persisted_transaction.cs
  21. +5 −14 src/EventStore/EventStore.Core/DataStructures/ILRUCache.cs
  22. +29 −1 src/EventStore/EventStore.Core/DataStructures/LRUCache.cs
  23. +21 −0 src/EventStore/EventStore.Core/DataStructures/NoLRUCache.cs
  24. +2 −0  src/EventStore/EventStore.Core/EventStore.Core.csproj
  25. +6 −2 src/EventStore/EventStore.Core/Index/HashListMemTable.cs
  26. +87 −70 src/EventStore/EventStore.Core/Services/Storage/ReaderIndex/ReadIndex.cs
  27. +14 −0 src/EventStore/EventStore.Core/Services/Storage/ReaderIndex/StreamCacheInfo.cs
  28. +2 −2 src/EventStore/EventStore.Core/Services/Storage/StorageWriter.cs
  29. +1 −1  src/EventStore/EventStore.Core/SingleVNode.cs
  30. +2 −1  src/EventStore/EventStore.Padmin/Program.cs
  31. +2 −1  src/EventStore/EventStore.TestClient/Commands/RunTestScenarios/ScenarioBase.cs
  32. +6 −0 src/EventStore/Projections.Dev.WindowsOnly.sln.DotSettings
View
49 src/EventStore/EventStore.ClientAPI/Common/Utils/Tasks.cs
@@ -0,0 +1,49 @@
+// Copyright (c) 2012, Event Store LLP
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// Neither the name of the Event Store LLP nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+using System.Threading.Tasks;
+
+namespace EventStore.ClientAPI.Common.Utils
+{
+ internal static class Tasks
+ {
+ public static Task CreateCompleted()
+ {
+ var source = new TaskCompletionSource<object>();
+ source.SetResult(null);
+ return source.Task;
+ }
+
+ public static Task<TResult> CreateCompleted<TResult>(TResult result)
+ {
+ var source = new TaskCompletionSource<TResult>();
+ source.SetResult(result);
+ return source.Task;
+ }
+ }
+}
View
178 src/EventStore/EventStore.ClientAPI/Connection/ClusterExplorer.cs
@@ -0,0 +1,178 @@
+// Copyright (c) 2012, Event Store LLP
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// Neither the name of the Event Store LLP nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+using System;
+using System.IO;
+using System.Net;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Xml.Serialization;
+using EventStore.ClientAPI.Common.Log;
+using EventStore.ClientAPI.Exceptions;
+using EventStore.ClientAPI.Messages;
+using EventStore.ClientAPI.SystemData;
+using EventStore.ClientAPI.Transport.Http;
+using HttpStatusCode = EventStore.ClientAPI.Transport.Http.HttpStatusCode;
+using System.Linq;
+
+namespace EventStore.ClientAPI.Connection
+{
+ internal class ClusterExplorer
+ {
+ private readonly ILogger _log;
+ private readonly HttpAsyncClient _client = new HttpAsyncClient();
+
+ private readonly bool _allowForwarding;
+ private readonly int _maxAttempts;
+ private readonly int _port;
+
+ public ClusterExplorer(bool allowForwarding, int maxAttempts, int port)
+ {
+ _log = LogManager.GetLogger();
+
+ _allowForwarding = allowForwarding;
+ _maxAttempts = maxAttempts;
+ _port = port;
+ }
+
+ public Task<EndpointsPair?> Resolve(string dns)
+ {
+ var resolve = Task.Factory.StartNew(() => Dns.GetHostAddresses(dns));
+ return resolve.ContinueWith(addresses => DiscoverCLuster(addresses.Result, _maxAttempts));
+ }
+
+ private EndpointsPair? DiscoverCLuster(IPAddress[] managers, int maxAttempts)
+ {
+ if (managers == null || managers.Length == 0)
+ throw new CannotEstablishConnectionException("DNS entry resolved in empty ip addresses list");
+
+ var info = GetClusterInfo(managers, maxAttempts);
+ if (info != null && info.Members != null && info.Members.Any())
+ {
+ var alive = info.Members.Where(m => m.IsAlive).ToArray();
+ if (!_allowForwarding)
+ {
+ _log.Info("Forwarding denied. Looking for master...");
+ var master = alive.FirstOrDefault(m => m.State == ClusterMessages.VNodeState.Master);
+ if (master == null)
+ {
+ _log.Info("Master not found");
+ return null;
+ }
+ _log.Info("Master found on [{0}:{1}, {2}:{3}]", master.ExternalTcpIp, master.ExternalTcpPort, master.ExternalHttpIp, master.ExternalHttpPort);
+ return new EndpointsPair(new IPEndPoint(IPAddress.Parse(master.ExternalTcpIp), master.ExternalTcpPort),
+ new IPEndPoint(IPAddress.Parse(master.ExternalHttpIp), master.ExternalHttpPort));
+ }
+
+ var node = alive.FirstOrDefault(m => m.State == ClusterMessages.VNodeState.Master) ??
+ alive.FirstOrDefault(m => m.State == ClusterMessages.VNodeState.Slave) ??
+ alive.FirstOrDefault(m => m.State == ClusterMessages.VNodeState.Clone);
+
+ if (node == null)
+ {
+ _log.Info("Unable to locate master, slave or clone node");
+ return null;
+ }
+ _log.Info("Best choise found, it's {0} on [{1}:{2}, {3}:{4}]",
+ node.State,
+ node.ExternalTcpIp,
+ node.ExternalTcpPort,
+ node.ExternalHttpIp,
+ node.ExternalHttpPort);
+ return new EndpointsPair(new IPEndPoint(IPAddress.Parse(node.ExternalTcpIp), node.ExternalTcpPort),
+ new IPEndPoint(IPAddress.Parse(node.ExternalHttpIp), node.ExternalHttpPort));
+ }
+
+ _log.Info("Failed to discover cluster. No information available");
+ return null;
+ }
+
+ private ClusterMessages.ClusterInfoDto GetClusterInfo(IPAddress[] managers, int maxAttempts)
+ {
+ var attempt = 0;
+ var random = new Random();
+ while (attempt < maxAttempts)
+ {
+ _log.Info("Discovering cluster. Attempt {0}...", attempt + 1);
+ var i = random.Next(0, managers.Length);
+ _log.Info("Picked [{0}]", managers[i]);
+ var info = ClusterInfoOrDefault(managers[i]);
+ if(info != null)
+ {
+ _log.Info("Going to select node based on info from [{0}]", managers[i]);
+ return info;
+ }
+
+ _log.Info("Failed to get cluster info from [{0}]. Attempt {1} of {2}", attempt + 1, maxAttempts);
+ attempt++;
+ Thread.Sleep(TimeSpan.FromSeconds(1));
+ }
+ return null;
+ }
+
+ private ClusterMessages.ClusterInfoDto ClusterInfoOrDefault(IPAddress manager)
+ {
+ ClusterMessages.ClusterInfoDto info = null;
+ var completed = new ManualResetEvent(false);
+
+ Action<HttpResponse> success = response =>
+ {
+ _log.Info("Got response from manager on [{0}]", manager);
+ if (response.HttpStatusCode != HttpStatusCode.OK)
+ {
+ _log.Info("Manager responded with {0} ({1})", response.HttpStatusCode, response.StatusDescription);
+ completed.Set();
+ return;
+ }
+ try
+ {
+ using (var reader = new StringReader(response.Body))
+ info = (ClusterMessages.ClusterInfoDto)new XmlSerializer(typeof(ClusterMessages.ClusterInfoDto)).Deserialize(reader);
+ }
+ catch (Exception e)
+ {
+ _log.Info(e, "Failed to get cluster info from manager on [{0}]. Deserialization error", manager);
+ }
+ completed.Set();
+ };
+
+ Action<Exception> error = e =>
+ {
+ _log.Info(e, "Failed to get cluster info from manager on [{0}]. Request failed");
+ completed.Set();
+ };
+
+ var url = new IPEndPoint(manager, _port).ToHttpUrl("/gossip?format=xml");
+ _log.Info("Sending gossip request to {0}...", url);
+ _client.Get(url, success, error);
+
+ completed.WaitOne();
+ return info;
+ }
+ }
+}
View
44 src/EventStore/EventStore.ClientAPI/Configure.cs → ...tStore/EventStore.ClientAPI/ConnectionSettings.cs
@@ -25,47 +25,13 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
-using System;
-using System.Net;
namespace EventStore.ClientAPI
{
- public class Configure
+ public enum Mode
{
- internal IPAddress _address;
- internal int _port;
- internal string _name;
-
- private Configure(string name, IPAddress ipaddress, int port)
- {
- _name = name;
- _address = ipaddress;
- _port = port;
- }
-
- public static Configure AsDefault()
- {
- return new Configure("Default", IPAddress.Loopback, 1113);
- }
-
- public static Configure Named(string name)
- {
- return new Configure(name, IPAddress.Loopback, 1113);
- }
-
- public Configure ToServerNamed(string name)
- {
- var addresses = Dns.GetHostAddresses(name);
- if (addresses.Length == 0) throw new HostNotFoundException(name);
-
- return new Configure(_name, addresses[0], _port);
- }
- }
-
- public class HostNotFoundException : Exception
- {
- public HostNotFoundException(string name) : base("Host with name " + name + " not found")
- {
- }
+ FullConsistency,
+ ConsistentWrites,
+ ConsistentReads,
}
-}
+}
View
6 src/EventStore/EventStore.ClientAPI/EventStore.ClientAPI.csproj
@@ -85,6 +85,9 @@
<Compile Include="ClientOperations\StartTransactionOperation.cs" />
<Compile Include="ClientOperations\TransactionalWriteOperation.cs" />
<Compile Include="Common\Log\DefaultLogger.cs" />
+ <Compile Include="Common\Utils\Tasks.cs" />
+ <Compile Include="ConnectionSettings.cs" />
+ <Compile Include="Connection\ClusterExplorer.cs" />
<Compile Include="Connection\ProjectionsManager.cs" />
<Compile Include="Connection\Subscription.cs" />
<Compile Include="Connection\SubscriptionsChannel.cs" />
@@ -109,7 +112,6 @@
<Compile Include="Common\Utils\BytesFormatter.cs" />
<Compile Include="Common\ConcurrentCollections\ConcurrentQueue.cs" />
<Compile Include="Common\ConcurrentCollections\ConcurrentStack.cs" />
- <Compile Include="Configure.cs" />
<Compile Include="ClientOperations\DeleteStreamOperation.cs" />
<Compile Include="ClientOperations\IClientOperation.cs" />
<Compile Include="ClientOperations\ReadStreamEventsForwardOperation.cs" />
@@ -118,6 +120,7 @@
<Compile Include="IProjectionsManagement.cs" />
<Compile Include="AllEventsSlice.cs" />
<Compile Include="Messages\ClientMessagesExtensions.cs" />
+ <Compile Include="Messages\ClusterMessages.cs" />
<Compile Include="Position.cs" />
<Compile Include="RecordedEvent.cs" />
<Compile Include="StreamPosition.cs" />
@@ -151,7 +154,6 @@
<Compile Include="Transport.Tcp\TcpConnector.cs" />
<Compile Include="Transport.Tcp\TcpTypedConnection.cs" />
<Compile Include="EventStoreConnection.cs" />
- <Compile Include="EventStore.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Transport.Tcp\TcpClientConnector.cs" />
<Compile Include="Transport.Tcp\IMonitoredTcpConnection.cs" />
View
100 src/EventStore/EventStore.ClientAPI/EventStore.cs
@@ -1,100 +0,0 @@
-// Copyright (c) 2012, Event Store LLP
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// Redistributions of source code must retain the above copyright notice,
-// this list of conditions and the following disclaimer.
-// Redistributions in binary form must reproduce the above copyright
-// notice, this list of conditions and the following disclaimer in the
-// documentation and/or other materials provided with the distribution.
-// Neither the name of the Event Store LLP nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-//
-using System.Collections.Generic;
-using System.Net;
-using System.Threading.Tasks;
-
-namespace EventStore.ClientAPI
-{
- public static class EventStore
- {
- private static string _name;
- private static EventStoreConnection _connection;
-
- public static void Configure(Configure args)
- {
- _name = args._name;
- _connection = new EventStoreConnection(new IPEndPoint(args._address, args._port));
- }
-
- public static EventStreamSlice ReadEventStream(string stream, int start, int count)
- {
- return _connection.ReadEventStreamForward(stream, start, count);
- }
-
- public static Task<EventStreamSlice> ReadEventStreamAsync(string stream, int start, int count)
- {
- return _connection.ReadEventStreamForwardAsync(stream, start, count);
- }
-
- public static void CreateStream(string stream, byte[] metadata)
- {
- _connection.CreateStream(stream, metadata);
- }
-
- public static Task CreateStreamAsync(string stream, byte[] metadata)
- {
- return _connection.CreateStreamAsync(stream, metadata);
- }
-
- public static void AppendToStream(string stream, int expectedVersion, IEnumerable<IEvent> events)
- {
- _connection.AppendToStream(stream, expectedVersion, events);
- }
-
- public static Task AppendToStreamAsync(string stream, int expectedVersion, IEnumerable<IEvent> events)
- {
- return _connection.AppendToStreamAsync(stream, expectedVersion, events);
- }
-
- public static void DeleteStream(string stream, int expectedVersion)
- {
- _connection.DeleteStream(stream, expectedVersion);
- }
-
- public static Task DeleteStreamAsync(string stream, int expectedVersion)
- {
- return _connection.DeleteStreamAsync(stream, expectedVersion);
- }
-
- public static void DeleteStream(string stream)
- {
- _connection.DeleteStream(stream, ExpectedVersion.Any);
- }
-
- public static Task DeleteStreamAsync(string stream)
- {
- return _connection.DeleteStreamAsync(stream, ExpectedVersion.Any);
- }
-
- public static void Close()
- {
- _connection.Close();
- }
- }
-}
View
166 src/EventStore/EventStore.ClientAPI/EventStoreConnection.cs
@@ -36,6 +36,7 @@
using System.Threading.Tasks;
using EventStore.ClientAPI.ClientOperations;
using EventStore.ClientAPI.Common.Log;
+using EventStore.ClientAPI.Common.Utils;
using EventStore.ClientAPI.Connection;
using EventStore.ClientAPI.Exceptions;
using EventStore.ClientAPI.SystemData;
@@ -47,6 +48,8 @@ namespace EventStore.ClientAPI
{
public class EventStoreConnection : IProjectionsManagement, IDisposable
{
+ public IProjectionsManagement Projections { get { return this; } }
+
private readonly ILogger _log;
private const int MaxQueueSize = 5000;
@@ -66,6 +69,7 @@ public class EventStoreConnection : IProjectionsManagement, IDisposable
private readonly TcpConnector _connector;
private TcpTypedConnection _connection;
private readonly object _connectionLock = new object();
+ private volatile bool _isActive;
private readonly SubscriptionsChannel _subscriptionsChannel;
private readonly ProjectionsManager _projectionsManager;
@@ -83,32 +87,15 @@ public class EventStoreConnection : IProjectionsManagement, IDisposable
private readonly Stopwatch _timeoutCheckStopwatch = new Stopwatch();
private int _reconnectionsCount;
- private readonly Thread _worker;
+ private Thread _worker;
private volatile bool _stopping;
- public IProjectionsManagement Projections
+ private EventStoreConnection(bool allowForwarding,
+ int maxConcurrentRequests,
+ int maxAttemptsForOperation,
+ int maxReconnections,
+ ILogger logger)
{
- get
- {
- return this;
- }
- }
-
- public EventStoreConnection(IPEndPoint tcpEndPoint,
- IPEndPoint httpEndpoint = null,
- bool allowForwarding = true,
- int maxConcurrentRequests = 5000,
- int maxAttemptsForOperation = 10,
- int maxReconnections = 10,
- ILogger logger = null)
- {
- Ensure.NotNull(tcpEndPoint, "tcpEndPoint");
- Ensure.Positive(maxConcurrentRequests, "maxConcurrentRequests");
- Ensure.Nonnegative(maxAttemptsForOperation, "maxAttemptsForOperation");
- Ensure.Nonnegative(maxReconnections, "maxReconnections");
-
- _tcpEndPoint = tcpEndPoint;
- _httpEndPoint = httpEndpoint ?? new IPEndPoint(_tcpEndPoint.Address, _tcpEndPoint.Port + 1000);
_allowForwarding = allowForwarding;
_maxConcurrentItems = maxConcurrentRequests;
_maxAttempts = maxAttemptsForOperation;
@@ -120,17 +107,101 @@ public IProjectionsManagement Projections
_connector = new TcpConnector();
_subscriptionsChannel = new SubscriptionsChannel(_connector);
_projectionsManager = new ProjectionsManager();
+ }
+
+ public static EventStoreConnection Create()
+ {
+ return new EventStoreConnection(allowForwarding: true,
+ maxConcurrentRequests: 5000,
+ maxAttemptsForOperation: 10,
+ maxReconnections: 10,
+ logger: null);
+ }
- _lastReconnectionTimestamp = DateTime.UtcNow;
- _connection = _connector.CreateTcpConnection(_tcpEndPoint, OnPackageReceived, OnConnectionEstablished, OnConnectionClosed);
- _timeoutCheckStopwatch.Start();
+ public static EventStoreConnection Create(bool allowForwarding = true,
+ int maxConcurrentRequests = 5000,
+ int maxAttemptsForOperation = 10,
+ int maxReconnections = 10,
+ ILogger logger = null)
+ {
+ Ensure.Positive(maxConcurrentRequests, "maxConcurrentRequests");
+ Ensure.Positive(maxAttemptsForOperation, "maxAttemptsForOperation");
+ Ensure.Nonnegative(maxReconnections, "maxReconnections");
- _worker = new Thread(MainLoop)
+ return new EventStoreConnection(allowForwarding,
+ maxConcurrentRequests,
+ maxAttemptsForOperation,
+ maxReconnections,
+ logger);
+ }
+
+ public void Connect(IPEndPoint tcpEndPoint, IPEndPoint httpEndPoint = null)
+ {
+ Ensure.NotNull(tcpEndPoint, "tcpEndPoint");
+ var task = ConnectAsync(tcpEndPoint, httpEndPoint);
+ task.Wait();
+ }
+
+ public Task ConnectAsync(IPEndPoint tcpEndPoint, IPEndPoint httpEndPoint = null)
+ {
+ Ensure.NotNull(tcpEndPoint, "tcpEndPoint");
+ return EstablishConnectionAsync(tcpEndPoint, httpEndPoint ?? new IPEndPoint(tcpEndPoint.Address, tcpEndPoint.Port + 1000));
+ }
+
+ public void Connect(string clusterDns, int maxAttempts = 10, int port = 30777)
+ {
+ Ensure.NotNullOrEmpty(clusterDns, "clusterDns");
+ Ensure.Positive(maxAttempts, "maxAttempts");
+ Ensure.Nonnegative(port, "port");
+
+ var task = ConnectAsync(clusterDns, maxAttempts, port);
+ task.Wait();
+ }
+
+ public Task ConnectAsync(string clusterDns, int maxAttempts = 10, int port = 30777)
+ {
+ Ensure.NotNullOrEmpty(clusterDns, "clusterDns");
+ Ensure.Positive(maxAttempts, "maxAttempts");
+ Ensure.Nonnegative(port, "port");
+
+ var explorer = new ClusterExplorer(_allowForwarding, maxAttempts, port);
+ return explorer.Resolve(clusterDns)
+ .ContinueWith(t =>
+ {
+ var pair = t.Result;
+ if (!pair.HasValue)
+ throw new CannotEstablishConnectionException("Failed to find node to connect");
+
+ return EstablishConnectionAsync(pair.Value.TcpEndPoint, pair.Value.HttpEndPoint);
+ });
+ }
+
+ private Task EstablishConnectionAsync(IPEndPoint tcpEndPoint, IPEndPoint httpEndPoint)
+ {
+ lock (_connectionLock)
{
- IsBackground = true,
- Name = "Worker thread"
- };
- _worker.Start();
+ if (_isActive)
+ throw new InvalidOperationException("EventStoreConnection is already active");
+ _isActive = true;
+
+ _tcpEndPoint = tcpEndPoint;
+ _httpEndPoint = httpEndPoint;
+
+ _lastReconnectionTimestamp = DateTime.UtcNow;
+ _connection = _connector.CreateTcpConnection(_tcpEndPoint, OnPackageReceived, OnConnectionEstablished, OnConnectionClosed);
+ _timeoutCheckStopwatch.Start();
+
+ _worker = new Thread(MainLoop) {IsBackground = true, Name = "Worker thread"};
+ _worker.Start();
+
+ return Tasks.CreateCompleted();
+ }
+ }
+
+ private void EnsureActive()
+ {
+ if (!_isActive)
+ throw new InvalidOperationException("EventStoreConnection is not active");
}
public void Close()
@@ -162,6 +233,7 @@ void IDisposable.Dispose()
public void CreateStream(string stream, byte[] metadata)
{
Ensure.NotNullOrEmpty(stream, "stream");
+ EnsureActive();
var task = CreateStreamAsync(stream, metadata);
task.Wait();
@@ -170,6 +242,7 @@ public void CreateStream(string stream, byte[] metadata)
public Task CreateStreamAsync(string stream, byte[] metadata)
{
Ensure.NotNullOrEmpty(stream, "stream");
+ EnsureActive();
var source = new TaskCompletionSource<object>();
var operation = new CreateStreamOperation(source, Guid.NewGuid(), _allowForwarding, stream, metadata);
@@ -181,6 +254,7 @@ public Task CreateStreamAsync(string stream, byte[] metadata)
public void DeleteStream(string stream, int expectedVersion)
{
Ensure.NotNullOrEmpty(stream, "stream");
+ EnsureActive();
var task = DeleteStreamAsync(stream, expectedVersion);
task.Wait();
@@ -189,6 +263,7 @@ public void DeleteStream(string stream, int expectedVersion)
public Task DeleteStreamAsync(string stream, int expectedVersion)
{
Ensure.NotNullOrEmpty(stream, "stream");
+ EnsureActive();
var source = new TaskCompletionSource<object>();
var operation = new DeleteStreamOperation(source, Guid.NewGuid(), _allowForwarding, stream, expectedVersion);
@@ -201,6 +276,7 @@ public void AppendToStream(string stream, int expectedVersion, IEnumerable<IEven
{
Ensure.NotNullOrEmpty(stream, "stream");
Ensure.NotNull(events, "events");
+ EnsureActive();
var task = AppendToStreamAsync(stream, expectedVersion, events);
task.Wait();
@@ -210,6 +286,7 @@ public Task AppendToStreamAsync(string stream, int expectedVersion, IEnumerable<
{
Ensure.NotNullOrEmpty(stream, "stream");
Ensure.NotNull(events, "events");
+ EnsureActive();
var source = new TaskCompletionSource<object>();
var operation = new AppendToStreamOperation(source, Guid.NewGuid(), _allowForwarding, stream, expectedVersion, events);
@@ -221,6 +298,7 @@ public Task AppendToStreamAsync(string stream, int expectedVersion, IEnumerable<
public EventStoreTransaction StartTransaction(string stream, int expectedVersion)
{
Ensure.NotNullOrEmpty(stream, "stream");
+ EnsureActive();
var task = StartTransactionAsync(stream, expectedVersion);
task.Wait();
@@ -230,6 +308,7 @@ public EventStoreTransaction StartTransaction(string stream, int expectedVersion
public Task<EventStoreTransaction> StartTransactionAsync(string stream, int expectedVersion)
{
Ensure.NotNullOrEmpty(stream, "stream");
+ EnsureActive();
var source = new TaskCompletionSource<EventStoreTransaction>();
var operation = new StartTransactionOperation(source, Guid.NewGuid(), _allowForwarding, stream, expectedVersion);
@@ -242,6 +321,7 @@ public void TransactionalWrite(long transactionId, string stream, IEnumerable<IE
{
Ensure.NotNullOrEmpty(stream, "stream");
Ensure.NotNull(events, "events");
+ EnsureActive();
var task = TransactionalWriteAsync(transactionId, stream, events);
task.Wait();
@@ -251,6 +331,7 @@ public Task TransactionalWriteAsync(long transactionId, string stream, IEnumerab
{
Ensure.NotNullOrEmpty(stream, "stream");
Ensure.NotNull(events, "events");
+ EnsureActive();
var source = new TaskCompletionSource<object>();
var operation = new TransactionalWriteOperation(source, Guid.NewGuid(), _allowForwarding, transactionId, stream, events);
@@ -262,6 +343,7 @@ public Task TransactionalWriteAsync(long transactionId, string stream, IEnumerab
public void CommitTransaction(long transactionId, string stream)
{
Ensure.NotNullOrEmpty(stream, "stream");
+ EnsureActive();
var task = CommitTransactionAsync(transactionId, stream);
task.Wait();
@@ -270,6 +352,7 @@ public void CommitTransaction(long transactionId, string stream)
public Task CommitTransactionAsync(long transactionId, string stream)
{
Ensure.NotNullOrEmpty(stream, "stream");
+ EnsureActive();
var source = new TaskCompletionSource<object>();
var operation = new CommitTransactionOperation(source, Guid.NewGuid(), _allowForwarding, transactionId, stream);
@@ -283,6 +366,7 @@ public EventStreamSlice ReadEventStreamForward(string stream, int start, int cou
Ensure.NotNullOrEmpty(stream, "stream");
Ensure.Nonnegative(start, "start");
Ensure.Positive(count, "count");
+ EnsureActive();
var task = ReadEventStreamForwardAsync(stream, start, count);
task.Wait();
@@ -295,6 +379,7 @@ public Task<EventStreamSlice> ReadEventStreamForwardAsync(string stream, int sta
Ensure.NotNullOrEmpty(stream, "stream");
Ensure.Nonnegative(start, "start");
Ensure.Positive(count, "count");
+ EnsureActive();
var source = new TaskCompletionSource<EventStreamSlice>();
var operation = new ReadStreamEventsForwardOperation(source, Guid.NewGuid(), stream, start, count, true);
@@ -307,6 +392,7 @@ public EventStreamSlice ReadEventStreamBackward(string stream, int start, int co
{
Ensure.NotNullOrEmpty(stream, "stream");
Ensure.Positive(count, "count");
+ EnsureActive();
var task = ReadEventStreamBackwardAsync(stream, start, count);
task.Wait();
@@ -318,6 +404,7 @@ public Task<EventStreamSlice> ReadEventStreamBackwardAsync(string stream, int st
{
Ensure.NotNullOrEmpty(stream, "stream");
Ensure.Positive(count, "count");
+ EnsureActive();
var source = new TaskCompletionSource<EventStreamSlice>();
var operation = new ReadStreamEventsBackwardOperation(source, Guid.NewGuid(), stream, start, count, true);
@@ -330,6 +417,7 @@ public AllEventsSlice ReadAllEventsForward(Position position, int maxCount)
{
Ensure.NotNull(position, "position");
Ensure.Positive(maxCount, "maxCount");
+ EnsureActive();
var task = ReadAllEventsForwardAsync(position, maxCount);
task.Wait();
@@ -340,6 +428,7 @@ public Task<AllEventsSlice> ReadAllEventsForwardAsync(Position position, int max
{
Ensure.NotNull(position, "position");
Ensure.Positive(maxCount, "maxCount");
+ EnsureActive();
var source = new TaskCompletionSource<AllEventsSlice>();
var operation = new ReadAllEventsForwardOperation(source, Guid.NewGuid(), position, maxCount, true);
@@ -352,6 +441,7 @@ public AllEventsSlice ReadAllEventsBackward(Position position, int maxCount)
{
Ensure.NotNull(position, "position");
Ensure.Positive(maxCount, "maxCount");
+ EnsureActive();
var task = ReadAllEventsBackwardAsync(position, maxCount);
task.Wait();
@@ -362,6 +452,7 @@ public Task<AllEventsSlice> ReadAllEventsBackwardAsync(Position position, int ma
{
Ensure.NotNull(position, "position");
Ensure.Positive(maxCount, "maxCount");
+ EnsureActive();
var source = new TaskCompletionSource<AllEventsSlice>();
var operation = new ReadAllEventsBackwardOperation(source, Guid.NewGuid(), position, maxCount, true);
@@ -375,36 +466,45 @@ public Task SubscribeAsync(string stream, Action<RecordedEvent, Position> eventA
Ensure.NotNullOrEmpty(stream, "stream");
Ensure.NotNull(eventAppeared, "eventAppeared");
Ensure.NotNull(subscriptionDropped, "subscriptionDropped");
+ EnsureActive();
lock(_connectionLock)
_subscriptionsChannel.EnsureConnected(_tcpEndPoint);
return _subscriptionsChannel.Subscribe(stream, eventAppeared, subscriptionDropped);
}
- public void Unsubscribe(string stream)
+ public Task UnsubscribeAsync(string stream)
{
Ensure.NotNullOrEmpty(stream, "stream");
+ EnsureActive();
lock (_connectionLock)
_subscriptionsChannel.EnsureConnected(_tcpEndPoint);
_subscriptionsChannel.Unsubscribe(stream);
+
+ return Tasks.CreateCompleted();
}
public Task SubscribeToAllStreamsAsync(Action<RecordedEvent, Position> eventAppeared, Action subscriptionDropped)
{
Ensure.NotNull(eventAppeared, "eventAppeared");
Ensure.NotNull(subscriptionDropped, "subscriptionDropped");
+ EnsureActive();
lock (_connectionLock)
_subscriptionsChannel.EnsureConnected(_tcpEndPoint);
return _subscriptionsChannel.SubscribeToAllStreams(eventAppeared, subscriptionDropped);
}
- public void UnsubscribeFromAllStreams()
+ public Task UnsubscribeFromAllStreamsAsync()
{
+ EnsureActive();
+
lock (_connectionLock)
_subscriptionsChannel.EnsureConnected(_tcpEndPoint);
_subscriptionsChannel.UnsubscribeFromAllStreams();
+
+ return Tasks.CreateCompleted();
}
void IProjectionsManagement.Enable(string name)
View
83 src/EventStore/EventStore.ClientAPI/Messages/ClusterMessages.cs
@@ -0,0 +1,83 @@
+// Copyright (c) 2012, Event Store LLP
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// Neither the name of the Event Store LLP nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+using System;
+
+namespace EventStore.ClientAPI.Messages
+{
+ public class ClusterMessages
+ {
+ public class ClusterInfoDto
+ {
+ public MemberInfoDto[] Members { get; set; }
+
+ public ClusterInfoDto()
+ {
+ }
+
+ public ClusterInfoDto(MemberInfoDto[] members)
+ {
+ Members = members;
+ }
+ }
+
+ public class MemberInfoDto
+ {
+ public DateTime TimeStamp { get; set; }
+ public VNodeState State { get; set; }
+ public bool IsAlive { get; set; }
+
+ public string InternalTcpIp { get; set; }
+ public int InternalTcpPort { get; set; }
+
+ public string ExternalTcpIp { get; set; }
+ public int ExternalTcpPort { get; set; }
+
+ public string InternalHttpIp { get; set; }
+ public int InternalHttpPort { get; set; }
+
+ public string ExternalHttpIp { get; set; }
+ public int ExternalHttpPort { get; set; }
+
+ public long WriterCheckpoint { get; set; }
+ public long ChaserCheckpoint { get; set; }
+ }
+
+ public enum VNodeState
+ {
+ Initializing,
+ Unknown,
+ CatchingUp,
+ Clone,
+ Slave,
+ Master,
+ Manager,
+ ShuttingDown,
+ Shutdown
+ }
+ }
+}
View
30 src/EventStore/EventStore.Core.Tests/ClientAPI/AllEvents/read_all_events_backward_should.cs
@@ -54,8 +54,9 @@ public void TearDown()
public void return_empty_slice_if_asked_to_read_from_start()
{
const string stream = "read_all_events_backward_should_return_empty_slice_if_asked_to_read_from_start";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -69,8 +70,9 @@ public void return_empty_slice_if_asked_to_read_from_start()
[Test]
public void return_empty_slice_if_no_events_present()
{
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var all = new List<RecordedEvent>();
var position = Position.End;
AllEventsSlice slice;
@@ -89,8 +91,9 @@ public void return_empty_slice_if_no_events_present()
public void return_partial_slice_if_not_enough_events()
{
const string stream = "read_all_events_backward_should_return_partial_slice_if_not_enough_events";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -110,8 +113,9 @@ public void return_partial_slice_if_not_enough_events()
public void return_events_in_reversed_order_compared_to_written()
{
const string stream = "read_all_events_backward_should_return_events_in_reversed_order_compared_to_written";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var testEvents = Enumerable.Range(0, 5).Select(x => new TestEvent((x + 1).ToString())).ToArray();
var create = store.CreateStreamAsync(stream, new byte[0]);
@@ -131,8 +135,9 @@ public void return_events_in_reversed_order_compared_to_written()
public void read_stream_created_events_as_well()
{
const string stream = "read_all_events_backward_should_read_system_events_as_well";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
Assert.DoesNotThrow(create1.Wait);
@@ -151,8 +156,9 @@ public void read_stream_created_events_as_well()
public void be_able_to_read_all_one_by_one_and_return_empty_slice_at_last()
{
const string stream = "read_all_events_backward_should_be_able_to_read_all_one_by_one_and_return_empty_slice_at_last";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -179,8 +185,9 @@ public void be_able_to_read_all_one_by_one_and_return_empty_slice_at_last()
public void be_able_to_read_events_slice_at_time()
{
const string stream = "read_all_events_backward_should_be_able_to_read_events_slice_at_time";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -207,8 +214,9 @@ public void be_able_to_read_events_slice_at_time()
public void not_return_events_from_deleted_streams()
{
const string stream = "read_all_events_backward_should_not_return_events_from_deleted_streams";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
Assert.DoesNotThrow(create1.Wait);
@@ -245,8 +253,9 @@ public void not_return_events_from_deleted_streams()
public void not_return_stream_deleted_records()
{
const string stream = "read_all_events_backward_should_not_return_stream_deleted_records";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
Assert.DoesNotThrow(create1.Wait);
@@ -268,8 +277,9 @@ public void not_return_stream_deleted_records()
public void return_no_records_if_stream_created_than_deleted()
{
const string stream = "read_all_events_backward_should_return_no_records_if_stream_created_than_deleted";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
View
35 src/EventStore/EventStore.Core.Tests/ClientAPI/AllEvents/read_all_events_forward_should.cs
@@ -57,8 +57,9 @@ public void TearDown()
public void return_empty_slice_if_asked_to_read_from_end()
{
const string stream = "read_all_events_forward_should_return_empty_slice_if_asked_to_read_from_end";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -77,8 +78,9 @@ public void return_empty_slice_if_asked_to_read_from_end()
[Test]
public void return_empty_slice_if_no_events_present()
{
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var all = new List<RecordedEvent>();
var position = Position.Start;
AllEventsSlice slice;
@@ -97,8 +99,9 @@ public void return_empty_slice_if_no_events_present()
public void return_events_in_same_order_as_written()
{
const string stream = "read_all_events_forward_should_return_events_in_same_order_as_written";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var testEvents = Enumerable.Range(0, 5).Select(x => new TestEvent((x + 1).ToString())).ToArray();
var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
@@ -125,8 +128,9 @@ public void return_events_in_same_order_as_written()
public void read_stream_created_events_as_well()
{
const string stream = "read_all_events_forward_should_read_system_events_as_well";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
Assert.DoesNotThrow(create1.Wait);
@@ -145,8 +149,9 @@ public void read_stream_created_events_as_well()
public void be_able_to_read_all_one_by_one_and_return_empty_slice_at_last()
{
const string stream = "read_all_events_forward_should_be_able_to_read_all_one_by_one_and_return_empty_slice_at_last";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -173,8 +178,9 @@ public void be_able_to_read_all_one_by_one_and_return_empty_slice_at_last()
public void be_able_to_read_events_slice_at_time()
{
const string stream = "read_all_events_forward_should_be_able_to_read_events_slice_at_time";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -201,8 +207,9 @@ public void be_able_to_read_events_slice_at_time()
public void return_partial_slice_if_not_enough_events()
{
const string stream = "read_all_events_forward_should_return_partial_slice_if_not_enough_events";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -222,8 +229,9 @@ public void return_partial_slice_if_not_enough_events()
public void not_return_events_from_deleted_streams()
{
const string stream = "read_all_events_forward_should_not_return_events_from_deleted_streams";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
Assert.DoesNotThrow(create1.Wait);
@@ -260,8 +268,9 @@ public void not_return_events_from_deleted_streams()
public void not_return_stream_deleted_records()
{
const string stream = "read_all_events_forward_should_not_return_stream_deleted_records";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
Assert.DoesNotThrow(create1.Wait);
@@ -283,8 +292,9 @@ public void not_return_stream_deleted_records()
public void return_no_records_if_stream_created_than_deleted()
{
const string stream = "read_all_events_forward_should_return_no_records_if_stream_created_than_deleted";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
Assert.DoesNotThrow(create1.Wait);
@@ -309,8 +319,9 @@ public void return_no_records_if_stream_created_than_deleted()
public void recover_from_dropped_subscription_state_using_last_known_position()
{
const string stream = "read_all_events_forward_should_recover_from_dropped_subscription_state_using_last_known_position";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var catched = new List<RecordedEvent>();
Position lastKnonwPosition = null;
var dropped = new AutoResetEvent(false);
@@ -332,7 +343,7 @@ public void recover_from_dropped_subscription_state_using_last_known_position()
var write = store.AppendToStreamAsync(stream, ExpectedVersion.EmptyStream, testEvents);
Assert.That(write.Wait(Timeout));
- store.Unsubscribe(stream);
+ store.UnsubscribeAsync(stream);
Assert.That(dropped.WaitOne(Timeout));
var write2 = store.AppendToStreamAsync(stream, testEvents.Length, testEvents);
View
11 src/EventStore/EventStore.Core.Tests/ClientAPI/AllEvents/subscribe_to_all_should.cs
@@ -55,8 +55,9 @@ public void TearDown()
public void allow_multiple_subscriptions()
{
const string stream = "subscribe_to_all_should_allow_multiple_subscriptions";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var appeared = new CountdownEvent(2);
var dropped = new CountdownEvent(2);
@@ -76,8 +77,9 @@ public void allow_multiple_subscriptions()
[Test]
public void drop_all_global_subscribers_when_unsubscribe_from_all_called()
{
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var appeared = new CountdownEvent(1);
var dropped = new CountdownEvent(2);
@@ -87,7 +89,7 @@ public void drop_all_global_subscribers_when_unsubscribe_from_all_called()
store.SubscribeToAllStreamsAsync(eventAppeared, subscriptionDropped);
store.SubscribeToAllStreamsAsync(eventAppeared, subscriptionDropped);
- store.UnsubscribeFromAllStreams();
+ store.UnsubscribeFromAllStreamsAsync();
Assert.That(dropped.Wait(Timeout));
}
}
@@ -96,8 +98,9 @@ public void drop_all_global_subscribers_when_unsubscribe_from_all_called()
public void catch_created_and_deleted_events_as_well()
{
const string stream = "subscribe_to_all_should_catch_created_and_deleted_events_as_well";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var appeared = new CountdownEvent(2);
var dropped = new CountdownEvent(1);
View
30 src/EventStore/EventStore.Core.Tests/ClientAPI/append_to_stream.cs
@@ -41,8 +41,9 @@ internal class append_to_stream
public void should_create_stream_with_no_stream_exp_ver_on_first_write_if_does_not_exist()
{
const string stream = "should_create_stream_with_no_stream_exp_ver_on_first_write_if_does_not_exist";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var append = store.AppendToStreamAsync(stream, ExpectedVersion.NoStream, new[] {new TestEvent()});
Assert.DoesNotThrow(append.Wait);
@@ -57,8 +58,9 @@ public void should_create_stream_with_no_stream_exp_ver_on_first_write_if_does_n
public void should_create_stream_with_any_exp_ver_on_first_write_if_does_not_exist()
{
const string stream = "should_create_stream_with_any_exp_ver_on_first_write_if_does_not_exist";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var append = store.AppendToStreamAsync(stream, ExpectedVersion.Any, new[] { new TestEvent() });
Assert.DoesNotThrow(append.Wait);
@@ -73,8 +75,9 @@ public void should_create_stream_with_any_exp_ver_on_first_write_if_does_not_exi
public void should_fail_to_create_stream_with_wrong_exp_ver_on_first_write_if_does_not_exist()
{
const string stream = "should_fail_to_create_stream_with_wrong_exp_ver_on_first_write_if_does_not_exist";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var append = store.AppendToStreamAsync(stream, ExpectedVersion.EmptyStream, new[] { new TestEvent() });
Assert.That(() => append.Wait(), Throws.Exception.TypeOf<AggregateException>().With.InnerException.TypeOf<WrongExpectedVersionException>());
}
@@ -85,8 +88,9 @@ public void should_fail_to_create_stream_with_wrong_exp_ver_on_first_write_if_do
public void should_fail_writing_with_correct_exp_ver_to_deleted_stream()
{
const string stream = "should_fail_writing_with_correct_exp_ver_to_deleted_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -103,8 +107,9 @@ public void should_fail_writing_with_correct_exp_ver_to_deleted_stream()
public void should_fail_writing_with_any_exp_ver_to_deleted_stream()
{
const string stream = "should_fail_writing_with_any_exp_ver_to_deleted_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -121,8 +126,9 @@ public void should_fail_writing_with_any_exp_ver_to_deleted_stream()
public void should_fail_writing_with_invalid_exp_ver_to_deleted_stream()
{
const string stream = "should_fail_writing_with_invalid_exp_ver_to_deleted_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -139,8 +145,9 @@ public void should_fail_writing_with_invalid_exp_ver_to_deleted_stream()
public void should_append_with_correct_exp_ver_to_existing_stream()
{
const string stream = "should_append_with_correct_exp_ver_to_existing_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -154,8 +161,9 @@ public void should_append_with_correct_exp_ver_to_existing_stream()
public void should_append_with_any_exp_ver_to_existing_stream()
{
const string stream = "should_append_with_any_exp_ver_to_existing_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -169,8 +177,9 @@ public void should_append_with_any_exp_ver_to_existing_stream()
public void should_fail_appending_with_wrong_exp_ver_to_existing_stream()
{
const string stream = "should_fail_appending_with_wrong_exp_ver_to_existing_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -184,8 +193,9 @@ public void should_fail_appending_with_wrong_exp_ver_to_existing_stream()
public void can_append_multiple_events_at_once()
{
const string stream = "can_append_multiple_events_at_once";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
View
12 src/EventStore/EventStore.Core.Tests/ClientAPI/creating_stream.cs
@@ -40,8 +40,9 @@ internal class creating_stream
public void which_does_not_exist_should_be_successfull()
{
const string stream = "which_does_not_exist_should_be_successfull";
- using (var connection = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var connection = EventStoreConnection.Create())
{
+ connection.Connect(MiniNode.Instance.TcpEndPoint);
var create = connection.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
}
@@ -52,8 +53,9 @@ public void which_does_not_exist_should_be_successfull()
public void which_supposed_to_be_system_should_succees__but_on_your_own_risk()
{
const string stream = "$which_supposed_to_be_system_should_succees__but_on_your_own_risk";
- using (var connection = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var connection = EventStoreConnection.Create())
{
+ connection.Connect(MiniNode.Instance.TcpEndPoint);
var create = connection.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
}
@@ -64,8 +66,9 @@ public void which_supposed_to_be_system_should_succees__but_on_your_own_risk()
public void which_already_exists_should_fail()
{
const string stream = "which_already_exists_should_fail";
- using (var connection = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var connection = EventStoreConnection.Create())
{
+ connection.Connect(MiniNode.Instance.TcpEndPoint);
var initialCreate = connection.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(initialCreate.Wait);
@@ -80,8 +83,9 @@ public void which_already_exists_should_fail()
public void which_was_deleted_should_fail()
{
const string stream = "which_was_deleted_should_fail";
- using (var connection = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var connection = EventStoreConnection.Create())
{
+ connection.Connect(MiniNode.Instance.TcpEndPoint);
var create = connection.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
View
15 src/EventStore/EventStore.Core.Tests/ClientAPI/deleting_stream.cs
@@ -40,8 +40,9 @@ internal class deleting_stream
public void which_already_exists_should_success_when_passed_empty_stream_expected_version()
{
const string stream = "which_already_exists_should_success_when_passed_empty_stream_expected_version";
- using (var connection = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var connection = EventStoreConnection.Create())
{
+ connection.Connect(MiniNode.Instance.TcpEndPoint);
var create = connection.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -55,8 +56,9 @@ public void which_already_exists_should_success_when_passed_empty_stream_expecte
public void which_already_exists_should_success_when_passed_any_for_expected_version()
{
const string stream = "which_already_exists_should_success_when_passed_any_for_expected_version";
- using (var connection = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var connection = EventStoreConnection.Create())
{
+ connection.Connect(MiniNode.Instance.TcpEndPoint);
var create = connection.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -70,8 +72,9 @@ public void which_already_exists_should_success_when_passed_any_for_expected_ver
public void with_invalid_expected_version_should_fail()
{
const string stream = "with_invalid_expected_version_should_fail";
- using (var connection = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var connection = EventStoreConnection.Create())
{
+ connection.Connect(MiniNode.Instance.TcpEndPoint);
var create = connection.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -85,8 +88,9 @@ public void with_invalid_expected_version_should_fail()
public void which_does_not_exist_should_fail()
{
const string stream = "which_does_not_exist_should_fail";
- using (var connection = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var connection = EventStoreConnection.Create())
{
+ connection.Connect(MiniNode.Instance.TcpEndPoint);
var delete = connection.DeleteStreamAsync(stream, ExpectedVersion.Any);
Assert.Inconclusive();
//Assert.That(() => delete.Wait(), Throws.Exception.TypeOf<AggregateException>().With.InnerException.TypeOf<WrongExpectedVersionException>());
@@ -98,8 +102,9 @@ public void which_does_not_exist_should_fail()
public void which_was_allready_deleted_should_fail()
{
const string stream = "which_was_allready_deleted_should_fail";
- using (var connection = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var connection = EventStoreConnection.Create())
{
+ connection.Connect(MiniNode.Instance.TcpEndPoint);
var create = connection.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
View
30 src/EventStore/EventStore.Core.Tests/ClientAPI/read_event_stream_backward_should.cs
@@ -41,8 +41,9 @@ internal class read_event_stream_backward_should
public void throw_if_count_le_zero()
{
const string stream = "read_event_stream_backward_should_throw_if_count_le_zero";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
Assert.Throws<ArgumentOutOfRangeException>(() => store.ReadEventStreamBackwardAsync(stream, 0, 0));
}
}
@@ -52,8 +53,9 @@ public void throw_if_count_le_zero()
public void throw_if_no_stream()
{
const string stream = "read_event_stream_backward_should_throw_if_no_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var read = store.ReadEventStreamBackwardAsync(stream, StreamPosition.End, 1);
Assert.That(() => read.Wait(), Throws.Exception.TypeOf<AggregateException>().With.InnerException.TypeOf<StreamDoesNotExistException>());
}
@@ -64,8 +66,9 @@ public void throw_if_no_stream()
public void throw_if_stream_deleted()
{
const string stream = "read_event_stream_backward_should_throw_if_stream_deleted";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var delete = store.DeleteStreamAsync(stream, ExpectedVersion.EmptyStream);
@@ -81,8 +84,9 @@ public void throw_if_stream_deleted()
public void return_single_event_when_called_on_empty_stream()
{
const string stream = "read_event_stream_backward_should_return_single_event_when_called_on_empty_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -98,8 +102,9 @@ public void return_single_event_when_called_on_empty_stream()
public void return_partial_slice_if_no_enough_events_in_stream()
{
const string stream = "read_event_stream_backward_should_return_partial_slice_if_no_enough_events_in_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -119,8 +124,9 @@ public void return_partial_slice_if_no_enough_events_in_stream()
public void return_events_reversed_compared_to_written()
{
const string stream = "read_event_stream_backward_should_return_events_reversed_compared_to_written";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -140,8 +146,9 @@ public void return_events_reversed_compared_to_written()
public void be_able_to_read_single_event_from_arbitrary_position()
{
const string stream = "read_event_stream_backward_should_be_able_to_read_single_event_from_arbitrary_position";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -161,8 +168,9 @@ public void be_able_to_read_single_event_from_arbitrary_position()
public void be_able_to_read_first_event()
{
const string stream = "read_event_stream_backward_should_be_able_to_read_first_event";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -182,8 +190,9 @@ public void be_able_to_read_first_event()
public void be_able_to_read_last_event()
{
const string stream = "read_event_stream_backward_should_be_able_to_read_last_event";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -203,8 +212,9 @@ public void be_able_to_read_last_event()
public void be_able_to_read_slice_from_arbitrary_position()
{
const string stream = "read_event_stream_backward_should_be_able_to_read_slice_from_arbitrary_position";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
View
36 src/EventStore/EventStore.Core.Tests/ClientAPI/read_event_stream_forward_should.cs
@@ -40,8 +40,9 @@ internal class read_event_stream_forward_should
public void throw_if_count_le_zero()
{
const string stream = "read_event_stream_forward_should_throw_if_count_le_zero";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
Assert.Throws<ArgumentOutOfRangeException>(() => store.ReadEventStreamForwardAsync(stream, 0, 0));
}
}
@@ -51,8 +52,9 @@ public void throw_if_count_le_zero()
public void throw_if_start_lt_zero()
{
const string stream = "read_event_stream_forward_should_throw_if_start_lt_zero";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
Assert.Throws<ArgumentOutOfRangeException>(() => store.ReadEventStreamForwardAsync(stream, -1, 1));
}
}
@@ -62,8 +64,9 @@ public void throw_if_start_lt_zero()
public void throw_if_no_stream()
{
const string stream = "read_event_stream_forward_should_throw_if_no_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var read = store.ReadEventStreamForwardAsync(stream, 0, 1);
Assert.That(() => read.Wait(), Throws.Exception.TypeOf<AggregateException>().With.InnerException.TypeOf<StreamDoesNotExistException>());
}
@@ -74,8 +77,9 @@ public void throw_if_no_stream()
public void throw_if_stream_deleted()
{
const string stream = "read_event_stream_forward_should_throw_if_stream_deleted";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var delete = store.DeleteStreamAsync(stream, ExpectedVersion.EmptyStream);
@@ -91,8 +95,9 @@ public void throw_if_stream_deleted()
public void return_single_event_when_called_on_empty_stream()
{
const string stream = "read_event_stream_forward_should_return_single_event_when_called_on_empty_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -108,8 +113,9 @@ public void return_single_event_when_called_on_empty_stream()
public void return_empty_slice_when_called_on_empty_stream_starting_at_position_1()
{
const string stream = "read_event_stream_forward_should_return_empty_slice_when_called_on_empty_stream_starting_at_position_1";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -125,8 +131,9 @@ public void return_empty_slice_when_called_on_empty_stream_starting_at_position_
public void return_empty_slice_when_called_on_non_existing_range()
{
const string stream = "read_event_stream_forward_should_return_empty_slice_when_called_on_non_existing_range";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -145,8 +152,9 @@ public void return_empty_slice_when_called_on_non_existing_range()
public void return_partial_slice_if_no_enough_events_in_stream()
{
const string stream = "read_event_stream_forward_should_return_partial_slice_if_no_enough_events_in_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -165,8 +173,9 @@ public void return_partial_slice_if_no_enough_events_in_stream()
public void return_partial_slice_when_got_int_max_value_as_maxcount()
{
const string stream = "read_event_stream_forward_should_return_partial_slice_when_got_int_max_value_as_maxcount";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -185,8 +194,9 @@ public void return_partial_slice_when_got_int_max_value_as_maxcount()
public void return_events_in_same_order_as_written()
{
const string stream = "read_event_stream_forward_should_return_events_in_same_order_as_written";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -206,8 +216,9 @@ public void return_events_in_same_order_as_written()
public void be_able_to_read_single_event_from_arbitrary_position()
{
const string stream = "read_event_stream_forward_should_be_able_to_read_from_arbitrary_position";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -227,8 +238,9 @@ public void be_able_to_read_single_event_from_arbitrary_position()
public void be_able_to_read_slice_from_arbitrary_position()
{
const string stream = "read_event_stream_forward_should_be_able_to_read_slice_from_arbitrary_position";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
View
20 src/EventStore/EventStore.Core.Tests/ClientAPI/subscribe_should.cs
@@ -41,8 +41,9 @@ internal class subscribe_should
public void be_able_to_subscribe_to_non_existing_stream_and_then_catch_created_event()
{
const string stream = "subscribe_should_be_able_to_subscribe_to_non_existing_stream_and_then_catch_created_event";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var appeared = new CountdownEvent(1);
var dropped = new CountdownEvent(1);
@@ -62,8 +63,9 @@ public void be_able_to_subscribe_to_non_existing_stream_and_then_catch_created_e
public void allow_multiple_subscriptions_to_same_stream()
{
const string stream = "subscribe_should_allow_multiple_subscriptions_to_same_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var appeared = new CountdownEvent(2);
var dropped = new CountdownEvent(2);
@@ -84,8 +86,9 @@ public void allow_multiple_subscriptions_to_same_stream()
public void call_dropped_callback_after_unsubscribe_method_call()
{
const string stream = "subscribe_should_call_dropped_callback_after_unsubscribe_method_call";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var appeared = new CountdownEvent(1);
var dropped = new CountdownEvent(1);
@@ -95,7 +98,7 @@ public void call_dropped_callback_after_unsubscribe_method_call()
store.SubscribeAsync(stream, eventAppeared, subscriptionDropped);
Assert.That(!appeared.Wait(50));
- store.Unsubscribe(stream);
+ store.UnsubscribeAsync(stream);
Assert.That(dropped.Wait(Timeout));
}
}
@@ -104,8 +107,9 @@ public void call_dropped_callback_after_unsubscribe_method_call()
public void subscribe_to_deleted_stream_as_well_but_never_invoke_user_callbacks()
{
const string stream = "subscribe_should_subscribe_to_deleted_stream_as_well_but_never_invoke_user_callbacks";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var appeared = new CountdownEvent(1);
var dropped = new CountdownEvent(1);
@@ -127,8 +131,9 @@ public void subscribe_to_deleted_stream_as_well_but_never_invoke_user_callbacks(
public void not_call_dropped_if_stream_was_deleted()
{
const string stream = "subscribe_should_not_call_dropped_if_stream_was_deleted";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var appeared = new CountdownEvent(1);
var dropped = new CountdownEvent(1);
@@ -154,8 +159,9 @@ public void not_call_dropped_if_stream_was_deleted()
public void catch_created_and_deleted_events_as_well()
{
const string stream = "subscribe_should_catch_created_and_deleted_events_as_well";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var appeared = new CountdownEvent(2);
var dropped = new CountdownEvent(1);
View
53 src/EventStore/EventStore.Core.Tests/ClientAPI/transaction.cs
@@ -45,8 +45,9 @@ internal class transaction
public void should_start_on_non_existing_stream_with_correct_exp_ver_and_create_stream_on_commit()
{
const string stream = "should_start_on_non_existing_stream_with_correct_exp_ver_and_create_stream_on_commit";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var start = store.StartTransactionAsync(stream, ExpectedVersion.NoStream);
Assert.DoesNotThrow(start.Wait);
var write = store.TransactionalWriteAsync(start.Result.TransactionId, stream, new[] {new TestEvent()});
@@ -61,8 +62,9 @@ public void should_start_on_non_existing_stream_with_correct_exp_ver_and_create_
public void should_start_on_non_existing_stream_with_exp_ver_any_and_create_stream_on_commit()
{
const string stream = "should_start_on_non_existing_stream_with_exp_ver_any_and_create_stream_on_commit";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var start = store.StartTransactionAsync(stream, ExpectedVersion.Any);
Assert.DoesNotThrow(start.Wait);
var write = store.TransactionalWriteAsync(start.Result.TransactionId, stream, new[] {new TestEvent()});
@@ -77,8 +79,9 @@ public void should_start_on_non_existing_stream_with_exp_ver_any_and_create_stre
public void should_fail_to_commit_non_existing_stream_with_wrong_exp_ver()
{
const string stream = "should_fail_to_commit_non_existing_stream_with_wrong_exp_ver";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var start = store.StartTransactionAsync(stream, ExpectedVersion.EmptyStream);
Assert.DoesNotThrow(start.Wait);
var write = store.TransactionalWriteAsync(start.Result.TransactionId, stream, new[] { new TestEvent() });
@@ -93,8 +96,9 @@ public void should_fail_to_commit_non_existing_stream_with_wrong_exp_ver()
public void should_create_stream_if_commits_no_events_to_empty_stream()
{
const string stream = "should_create_stream_if_commits_no_events_to_empty_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var start = store.StartTransactionAsync(stream, ExpectedVersion.NoStream);
Assert.DoesNotThrow(start.Wait);
var commit = store.CommitTransactionAsync(start.Result.TransactionId, start.Result.Stream);
@@ -112,8 +116,9 @@ public void should_create_stream_if_commits_no_events_to_empty_stream()
public void should_validate_expectations_on_commit()
{
const string stream = "should_validate_expectations_on_commit";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var start = store.StartTransactionAsync(stream, 100500);
Assert.DoesNotThrow(start.Wait);
var write = store.TransactionalWriteAsync(start.Result.TransactionId, stream, new[] { new TestEvent() });
@@ -136,14 +141,18 @@ public void should_commit_when_writing_with_exp_ver_any_even_while_somene_is_wri
var totalPlainWrites = 500;
//excplicitly creating stream
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
+ {
+ store.Connect(MiniNode.Instance.TcpEndPoint);
store.CreateStream(stream, new byte[0]);
+ }
//500 events during transaction
ThreadPool.QueueUserWorkItem(_ =>
{
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var transaction = store.StartTransaction(stream, ExpectedVersion.Any);
var writes = new List<Task>();
for (int i = 0; i < totalTranWrites; i++)
@@ -166,8 +175,9 @@ public void should_commit_when_writing_with_exp_ver_any_even_while_somene_is_wri
//500 events to same stream in parallel
ThreadPool.QueueUserWorkItem(_ =>
{
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var writes = new List<Task>();
for (int i = 0; i < totalPlainWrites; i++)
{
@@ -189,8 +199,9 @@ public void should_commit_when_writing_with_exp_ver_any_even_while_somene_is_wri
writesToSameStreamCompleted.WaitOne();
//check all written
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var slice = store.ReadEventStreamForward(stream, 0, totalTranWrites + totalPlainWrites + 1);
Assert.That(slice.Events.Length, Is.EqualTo(totalTranWrites + totalPlainWrites + 1));
@@ -204,11 +215,15 @@ public void should_commit_when_writing_with_exp_ver_any_even_while_somene_is_wri
public void should_fail_to_commit_if_started_with_correct_ver_but_committing_with_bad()
{
const string stream = "should_fail_to_commit_if_started_with_correct_ver_but_committing_with_bad";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
+ {
+ store.Connect(MiniNode.Instance.TcpEndPoint);
store.CreateStream(stream, new byte[0]);
+ }
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var start = store.StartTransactionAsync(stream, ExpectedVersion.EmptyStream);
Assert.DoesNotThrow(start.Wait);
@@ -228,11 +243,15 @@ public void should_fail_to_commit_if_started_with_correct_ver_but_committing_wit
public void should_not_fail_to_commit_if_started_with_wrong_ver_but_committing_with_correct_ver()
{
const string stream = "should_not_fail_to_commit_if_started_with_wrong_ver_but_committing_with_correct_ver";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
+ {
+ store.Connect(MiniNode.Instance.TcpEndPoint);
store.CreateStream(stream, new byte[0]);
+ }
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var start = store.StartTransactionAsync(stream, 1);
Assert.DoesNotThrow(start.Wait);
@@ -252,11 +271,15 @@ public void should_not_fail_to_commit_if_started_with_wrong_ver_but_committing_w
public void should_fail_to_commit_if_started_with_correct_ver_but_on_commit_stream_was_deleted()
{
const string stream = "should_fail_to_commit_if_started_with_correct_ver_but_on_commit_stream_was_deleted";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
+ {
+ store.Connect(MiniNode.Instance.TcpEndPoint);
store.CreateStream(stream, new byte[0]);
+ }
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var start = store.StartTransactionAsync(stream, ExpectedVersion.EmptyStream);
Assert.DoesNotThrow(start.Wait);
View
4 src/EventStore/EventStore.Core.Tests/Index/MemTableTests.cs
@@ -124,14 +124,14 @@ public void existing_item_is_found()
Assert.IsTrue(MemTable.TryGetOneValue(0x11, 0x01, out position));
}
- [Test]
+ [Test, Ignore("Should MemTable check for duplicates?..")]
public void duplicate_key_does_not_throw_exception()
{
MemTable.Add(0x11, 0x01, 0xffff);
Assert.DoesNotThrow(() => MemTable.Add(0x11, 0x01, 0xffff));
}
- [Test]
+ [Test, Ignore("Should MemTable check for duplicates?..")]
public void duplicate_key_does_not_make_any_harm()
{
MemTable.Add(0x11, 0x01, 0xffff);
View
2  src/EventStore/EventStore.Core.Tests/Services/Storage/ReadIndexTestScenario.cs
@@ -101,7 +101,7 @@ public override void TestFixtureSetUp()
() => reader,
TableIndex,
new ByLengthHasher(),
- new NoLRUCache<string, StreamMetadata>());
+ new NoLRUCache<string, StreamCacheInfo>());
ReadIndex.Build();
View
2  ...re.Core.Tests/Services/Storage/Transactions/when_rebuilding_index_for_partially_persisted_transaction.cs
@@ -65,7 +65,7 @@ public override void TestFixtureSetUp()
() => new TFChunkReader(Db, WriterChecksum),
TableIndex,
new ByLengthHasher(),
- new NoLRUCache<string, StreamMetadata>());
+ new NoLRUCache<string, StreamCacheInfo>());
ReadIndex.Build();
}
View
19 src/EventStore/EventStore.Core/DataStructures/ILRUCache.cs
@@ -25,24 +25,15 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
+
+using System;
+
namespace EventStore.Core.DataStructures
{
- public interface ILRUCache<in TKey, TValue>
+ public interface ILRUCache<TKey, TValue>
{
bool TryGet(TKey key, out TValue value);
void Put(TKey key, TValue value);
- }
-
- public class NoLRUCache<TKey, TValue>: ILRUCache<TKey, TValue>
- {
- public bool TryGet(TKey key, out TValue value)
- {
- value = default(TValue);
- return false;
- }
-
- public void Put(TKey key, TValue value)
- {
- }
+ void Put(TKey key, Func<TKey, TValue> addFactory, Func<TKey, TValue, TValue> updateFactory);
}
}
View
30 src/EventStore/EventStore.Core/DataStructures/LRUCache.cs
@@ -27,6 +27,7 @@
//
using System;
using System.Collections.Generic;
+using EventStore.Common.Utils;
namespace EventStore.Core.DataStructures
{
@@ -94,7 +95,34 @@ public void Put(TKey key, TValue value)
_orderList.AddLast(node);
}
}
-
+
+ public void Put(TKey key, Func<TKey, TValue> addFactory, Func<TKey, TValue, TValue> updateFactory)
+ {
+ Ensure.NotNull(addFactory, "addFactory");
+ Ensure.NotNull(updateFactory, "updateFactory");
+
+ lock (_lock)
+ {
+ LinkedListNode<LRUItem> node;
+ if (!_items.TryGetValue(key, out node))
+ {
+ node = GetNode();
+ node.Value.Key = key;
+ node.Value.Value = addFactory(key);
+
+ EnsureCapacity();
+
+ _items.Add(key, node);
+ }
+ else
+ {
+ node.Value.Value = updateFactory(key, node.Value.Value);
+ _orderList.Remove(node);
+ }
+ _orderList.AddLast(node);
+ }
+ }
+
private void EnsureCapacity()