Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

eventstoreconnection changes

  • Loading branch information...
commit 9fb39566b8522cedd53413b1f973806edd9c63ea 1 parent 5b22da2
@TarasRoshko TarasRoshko authored
Showing with 387 additions and 313 deletions.
  1. +49 −0 src/EventStore/EventStore.ClientAPI/Common/Utils/Tasks.cs
  2. +81 −46 src/EventStore/EventStore.ClientAPI/Connection/ClusterExplorer.cs
  3. +5 −39 src/EventStore/EventStore.ClientAPI/{Configure.cs → ConnectionSettings.cs}
  4. +2 −2 src/EventStore/EventStore.ClientAPI/EventStore.ClientAPI.csproj
  5. +0 −100 src/EventStore/EventStore.ClientAPI/EventStore.cs
  6. +65 −36 src/EventStore/EventStore.ClientAPI/EventStoreConnection.cs
  7. +20 −10 src/EventStore/EventStore.Core.Tests/ClientAPI/AllEvents/read_all_events_backward_should.cs
  8. +23 −12 src/EventStore/EventStore.Core.Tests/ClientAPI/AllEvents/read_all_events_forward_should.cs
  9. +7 −4 src/EventStore/EventStore.Core.Tests/ClientAPI/AllEvents/subscribe_to_all_should.cs
  10. +20 −10 src/EventStore/EventStore.Core.Tests/ClientAPI/append_to_stream.cs
  11. +8 −4 src/EventStore/EventStore.Core.Tests/ClientAPI/creating_stream.cs
  12. +10 −5 src/EventStore/EventStore.Core.Tests/ClientAPI/deleting_stream.cs
  13. +20 −10 src/EventStore/EventStore.Core.Tests/ClientAPI/read_event_stream_backward_should.cs
  14. +24 −12 src/EventStore/EventStore.Core.Tests/ClientAPI/read_event_stream_forward_should.cs
  15. +13 −7 src/EventStore/EventStore.Core.Tests/ClientAPI/subscribe_should.cs
  16. +38 −15 src/EventStore/EventStore.Core.Tests/ClientAPI/transaction.cs
  17. +2 −1  src/EventStore/EventStore.TestClient/Commands/RunTestScenarios/ScenarioBase.cs
View
49 src/EventStore/EventStore.ClientAPI/Common/Utils/Tasks.cs
@@ -0,0 +1,49 @@
+// Copyright (c) 2012, Event Store LLP
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// Neither the name of the Event Store LLP nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+using System.Threading.Tasks;
+
+namespace EventStore.ClientAPI.Common.Utils
+{
+ internal static class Tasks
+ {
+ public static Task CreateCompleted()
+ {
+ var source = new TaskCompletionSource<object>();
+ source.SetResult(null);
+ return source.Task;
+ }
+
+ public static Task<TResult> CreateCompleted<TResult>(TResult result)
+ {
+ var source = new TaskCompletionSource<TResult>();
+ source.SetResult(result);
+ return source.Task;
+ }
+ }
+}
View
127 src/EventStore/EventStore.ClientAPI/Connection/ClusterExplorer.cs
@@ -1,5 +1,32 @@
-using System;
-using System.Collections.Generic;
+// Copyright (c) 2012, Event Store LLP
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// Neither the name of the Event Store LLP nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+using System;
using System.IO;
using System.Net;
using System.Threading;
@@ -21,35 +48,37 @@ internal class ClusterExplorer
private readonly HttpAsyncClient _client = new HttpAsyncClient();
private readonly bool _allowForwarding;
+ private readonly int _maxAttempts;
private readonly int _port;
- public ClusterExplorer(bool allowForwarding, int port)
+ public ClusterExplorer(bool allowForwarding, int maxAttempts, int port)
{
_log = LogManager.GetLogger();
_allowForwarding = allowForwarding;
+ _maxAttempts = maxAttempts;
_port = port;
}
public Task<EndpointsPair?> Resolve(string dns)
{
var resolve = Task.Factory.StartNew(() => Dns.GetHostAddresses(dns));
- return resolve.ContinueWith(addresses => DiscoverCLuster(addresses.Result));
+ return resolve.ContinueWith(addresses => DiscoverCLuster(addresses.Result, _maxAttempts));
}
- private EndpointsPair? DiscoverCLuster(IPAddress[] managers)
+ private EndpointsPair? DiscoverCLuster(IPAddress[] managers, int maxAttempts)
{
if (managers == null || managers.Length == 0)
- throw new CannotEstablishConnectionException("DNS entry resolved to empty ip addresses list");
+ throw new CannotEstablishConnectionException("DNS entry resolved in empty ip addresses list");
- var clusterInfo = GetClusterInfo(managers);
- if (clusterInfo != null && clusterInfo.Members != null && clusterInfo.Members.Any())
+ var info = GetClusterInfo(managers, maxAttempts);
+ if (info != null && info.Members != null && info.Members.Any())
{
- var aliveMembers = clusterInfo.Members.Where(m => m.IsAlive);
+ var alive = info.Members.Where(m => m.IsAlive).ToArray();
if (!_allowForwarding)
{
_log.Info("Forwarding denied. Looking for master...");
- var master = aliveMembers.FirstOrDefault(m => m.State == ClusterMessages.VNodeState.Master);
+ var master = alive.FirstOrDefault(m => m.State == ClusterMessages.VNodeState.Master);
if (master == null)
{
_log.Info("Master not found");
@@ -60,17 +89,15 @@ public ClusterExplorer(bool allowForwarding, int port)
new IPEndPoint(IPAddress.Parse(master.ExternalHttpIp), master.ExternalHttpPort));
}
- var node = ((aliveMembers.FirstOrDefault(m => m.State == ClusterMessages.VNodeState.Master) ??
- aliveMembers.FirstOrDefault(m => m.State == ClusterMessages.VNodeState.Slave)) ??
- aliveMembers.FirstOrDefault(m => m.State == ClusterMessages.VNodeState.Clone)) ??
- aliveMembers.FirstOrDefault(m => m.State == ClusterMessages.VNodeState.CatchingUp);
+ var node = alive.FirstOrDefault(m => m.State == ClusterMessages.VNodeState.Master) ??
+ alive.FirstOrDefault(m => m.State == ClusterMessages.VNodeState.Slave) ??
+ alive.FirstOrDefault(m => m.State == ClusterMessages.VNodeState.Clone);
if (node == null)
{
- _log.Info("Unable to locate master or slave or clone or catching up node");
+ _log.Info("Unable to locate master, slave or clone node");
return null;
}
-
_log.Info("Best choise found, it's {0} on [{1}:{2}, {3}:{4}]",
node.State,
node.ExternalTcpIp,
@@ -85,59 +112,67 @@ public ClusterExplorer(bool allowForwarding, int port)
return null;
}
- private ClusterMessages.ClusterInfoDto GetClusterInfo(IPAddress[] managers)
+ private ClusterMessages.ClusterInfoDto GetClusterInfo(IPAddress[] managers, int maxAttempts)
{
- var allInfo = new List<ClusterMessages.ClusterInfoDto>(managers.Length);
- var requestCompleted = new CountdownEvent(managers.Length);
+ var attempt = 0;
+ var random = new Random();
+ while (attempt < maxAttempts)
+ {
+ _log.Info("Discovering cluster. Attempt {0}...", attempt + 1);
+ var i = random.Next(0, managers.Length);
+ _log.Info("Picked [{0}]", managers[i]);
+ var info = ClusterInfoOrDefault(managers[i]);
+ if(info != null)
+ {
+ _log.Info("Going to select node based on info from [{0}]", managers[i]);
+ return info;
+ }
+
+ _log.Info("Failed to get cluster info from [{0}]. Attempt {1} of {2}", attempt + 1, maxAttempts);
+ attempt++;
+ Thread.Sleep(TimeSpan.FromSeconds(1));
+ }
+ return null;
+ }
+
+ private ClusterMessages.ClusterInfoDto ClusterInfoOrDefault(IPAddress manager)
+ {
+ ClusterMessages.ClusterInfoDto info = null;
+ var completed = new ManualResetEvent(false);
Action<HttpResponse> success = response =>
{
- _log.Info("Got response from manager");
+ _log.Info("Got response from manager on [{0}]", manager);
if (response.HttpStatusCode != HttpStatusCode.OK)
{
_log.Info("Manager responded with {0} ({1})", response.HttpStatusCode, response.StatusDescription);
- requestCompleted.Signal();
+ completed.Set();
return;
}
try
{
using (var reader = new StringReader(response.Body))
- allInfo.Add((ClusterMessages.ClusterInfoDto)new XmlSerializer(typeof(ClusterMessages.ClusterInfoDto)).Deserialize(reader));
+ info = (ClusterMessages.ClusterInfoDto)new XmlSerializer(typeof(ClusterMessages.ClusterInfoDto)).Deserialize(reader);
}
catch (Exception e)
{
- _log.Info(e, "Failed to get cluster info from manager. Deserialization error");
+ _log.Info(e, "Failed to get cluster info from manager on [{0}]. Deserialization error", manager);
}
-
- requestCompleted.Signal();
+ completed.Set();
};
Action<Exception> error = e =>
{
- _log.Info(e, "Failed to get cluster info from manager");
- requestCompleted.Signal();
+ _log.Info(e, "Failed to get cluster info from manager on [{0}]. Request failed");
+ completed.Set();
};
- foreach (var manager in managers)
- {
- var url = new IPEndPoint(manager, _port).ToHttpUrl("/gossip?format=xml");
- _log.Info("Sending gossip request to {0}...", url);
- _client.Get(url, success, error);
- }
-
- requestCompleted.Wait();
-
- _log.Info("Aggregating info about cluster...");
- return allInfo.Any() ? allInfo.Aggregate(Accumulte) : null;
- }
+ var url = new IPEndPoint(manager, _port).ToHttpUrl("/gossip?format=xml");
+ _log.Info("Sending gossip request to {0}...", url);
+ _client.Get(url, success, error);
- private ClusterMessages.ClusterInfoDto Accumulte(ClusterMessages.ClusterInfoDto info1, ClusterMessages.ClusterInfoDto info2)
- {
- var members = info1.Members.Concat(info2.Members)
- .ToLookup(x => new IPEndPoint(IPAddress.Parse(x.InternalHttpIp), x.InternalHttpPort))
- .Select(x => x.OrderByDescending(y => y.TimeStamp).First())
- .ToArray();
- return new ClusterMessages.ClusterInfoDto(members);
+ completed.WaitOne();
+ return info;
}
}
}
View
44 src/EventStore/EventStore.ClientAPI/Configure.cs → ...tStore/EventStore.ClientAPI/ConnectionSettings.cs
@@ -25,47 +25,13 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
-using System;
-using System.Net;
namespace EventStore.ClientAPI
{
- public class Configure
+ public enum Mode
{
- internal IPAddress _address;
- internal int _port;
- internal string _name;
-
- private Configure(string name, IPAddress ipaddress, int port)
- {
- _name = name;
- _address = ipaddress;
- _port = port;
- }
-
- public static Configure AsDefault()
- {
- return new Configure("Default", IPAddress.Loopback, 1113);
- }
-
- public static Configure Named(string name)
- {
- return new Configure(name, IPAddress.Loopback, 1113);
- }
-
- public Configure ToServerNamed(string name)
- {
- var addresses = Dns.GetHostAddresses(name);
- if (addresses.Length == 0) throw new HostNotFoundException(name);
-
- return new Configure(_name, addresses[0], _port);
- }
- }
-
- public class HostNotFoundException : Exception
- {
- public HostNotFoundException(string name) : base("Host with name " + name + " not found")
- {
- }
+ FullConsistency,
+ ConsistentWrites,
+ ConsistentReads,
}
-}
+}
View
4 src/EventStore/EventStore.ClientAPI/EventStore.ClientAPI.csproj
@@ -85,6 +85,8 @@
<Compile Include="ClientOperations\StartTransactionOperation.cs" />
<Compile Include="ClientOperations\TransactionalWriteOperation.cs" />
<Compile Include="Common\Log\DefaultLogger.cs" />
+ <Compile Include="Common\Utils\Tasks.cs" />
+ <Compile Include="ConnectionSettings.cs" />
<Compile Include="Connection\ClusterExplorer.cs" />
<Compile Include="Connection\ProjectionsManager.cs" />
<Compile Include="Connection\Subscription.cs" />
@@ -110,7 +112,6 @@
<Compile Include="Common\Utils\BytesFormatter.cs" />
<Compile Include="Common\ConcurrentCollections\ConcurrentQueue.cs" />
<Compile Include="Common\ConcurrentCollections\ConcurrentStack.cs" />
- <Compile Include="Configure.cs" />
<Compile Include="ClientOperations\DeleteStreamOperation.cs" />
<Compile Include="ClientOperations\IClientOperation.cs" />
<Compile Include="ClientOperations\ReadStreamEventsForwardOperation.cs" />
@@ -153,7 +154,6 @@
<Compile Include="Transport.Tcp\TcpConnector.cs" />
<Compile Include="Transport.Tcp\TcpTypedConnection.cs" />
<Compile Include="EventStoreConnection.cs" />
- <Compile Include="EventStore.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Transport.Tcp\TcpClientConnector.cs" />
<Compile Include="Transport.Tcp\IMonitoredTcpConnection.cs" />
View
100 src/EventStore/EventStore.ClientAPI/EventStore.cs
@@ -1,100 +0,0 @@
-// Copyright (c) 2012, Event Store LLP
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// Redistributions of source code must retain the above copyright notice,
-// this list of conditions and the following disclaimer.
-// Redistributions in binary form must reproduce the above copyright
-// notice, this list of conditions and the following disclaimer in the
-// documentation and/or other materials provided with the distribution.
-// Neither the name of the Event Store LLP nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-//
-using System.Collections.Generic;
-using System.Net;
-using System.Threading.Tasks;
-
-namespace EventStore.ClientAPI
-{
- public static class EventStore
- {
- private static string _name;
- private static EventStoreConnection _connection;
-
- public static void Configure(Configure args)
- {
- _name = args._name;
- _connection = new EventStoreConnection(new IPEndPoint(args._address, args._port));
- }
-
- public static EventStreamSlice ReadEventStream(string stream, int start, int count)
- {
- return _connection.ReadEventStreamForward(stream, start, count);
- }
-
- public static Task<EventStreamSlice> ReadEventStreamAsync(string stream, int start, int count)
- {
- return _connection.ReadEventStreamForwardAsync(stream, start, count);
- }
-
- public static void CreateStream(string stream, byte[] metadata)
- {
- _connection.CreateStream(stream, metadata);
- }
-
- public static Task CreateStreamAsync(string stream, byte[] metadata)
- {
- return _connection.CreateStreamAsync(stream, metadata);
- }
-
- public static void AppendToStream(string stream, int expectedVersion, IEnumerable<IEvent> events)
- {
- _connection.AppendToStream(stream, expectedVersion, events);
- }
-
- public static Task AppendToStreamAsync(string stream, int expectedVersion, IEnumerable<IEvent> events)
- {
- return _connection.AppendToStreamAsync(stream, expectedVersion, events);
- }
-
- public static void DeleteStream(string stream, int expectedVersion)
- {
- _connection.DeleteStream(stream, expectedVersion);
- }
-
- public static Task DeleteStreamAsync(string stream, int expectedVersion)
- {
- return _connection.DeleteStreamAsync(stream, expectedVersion);
- }
-
- public static void DeleteStream(string stream)
- {
- _connection.DeleteStream(stream, ExpectedVersion.Any);
- }
-
- public static Task DeleteStreamAsync(string stream)
- {
- return _connection.DeleteStreamAsync(stream, ExpectedVersion.Any);
- }
-
- public static void Close()
- {
- _connection.Close();
- }
- }
-}
View
101 src/EventStore/EventStore.ClientAPI/EventStoreConnection.cs
@@ -36,6 +36,7 @@
using System.Threading.Tasks;
using EventStore.ClientAPI.ClientOperations;
using EventStore.ClientAPI.Common.Log;
+using EventStore.ClientAPI.Common.Utils;
using EventStore.ClientAPI.Connection;
using EventStore.ClientAPI.Exceptions;
using EventStore.ClientAPI.SystemData;
@@ -89,18 +90,12 @@ public class EventStoreConnection : IProjectionsManagement, IDisposable
private Thread _worker;
private volatile bool _stopping;
- public EventStoreConnection(IPEndPoint tcpEndPoint = null,
- IPEndPoint httpEndPoint = null,
- bool allowForwarding = true,
- int maxConcurrentRequests = 5000,
- int maxAttemptsForOperation = 10,
- int maxReconnections = 10,
- ILogger logger = null)
+ private EventStoreConnection(bool allowForwarding,
+ int maxConcurrentRequests,
+ int maxAttemptsForOperation,
+ int maxReconnections,
+ ILogger logger)
{
- Ensure.Positive(maxConcurrentRequests, "maxConcurrentRequests");
- Ensure.Nonnegative(maxAttemptsForOperation, "maxAttemptsForOperation");
- Ensure.Nonnegative(maxReconnections, "maxReconnections");
-
_allowForwarding = allowForwarding;
_maxConcurrentItems = maxConcurrentRequests;
_maxAttempts = maxAttemptsForOperation;
@@ -112,48 +107,76 @@ public class EventStoreConnection : IProjectionsManagement, IDisposable
_connector = new TcpConnector();
_subscriptionsChannel = new SubscriptionsChannel(_connector);
_projectionsManager = new ProjectionsManager();
+ }
- if (tcpEndPoint != null)
- {
- if (httpEndPoint != null)
- Connect(tcpEndPoint, httpEndPoint);
- else
- Connect(tcpEndPoint);
- }
- _httpEndPoint = httpEndPoint;
+ public static EventStoreConnection Create()
+ {
+ return new EventStoreConnection(allowForwarding: true,
+ maxConcurrentRequests: 5000,
+ maxAttemptsForOperation: 10,
+ maxReconnections: 10,
+ logger: null);
}
- public void Connect(IPEndPoint tcpEndPoint)
+ public static EventStoreConnection Create(bool allowForwarding = true,
+ int maxConcurrentRequests = 5000,
+ int maxAttemptsForOperation = 10,
+ int maxReconnections = 10,
+ ILogger logger = null)
{
- Ensure.NotNull(tcpEndPoint, "tcpEndPoint");
- Connect(tcpEndPoint, _httpEndPoint ?? new IPEndPoint(tcpEndPoint.Address, tcpEndPoint.Port + 1000));
+ Ensure.Positive(maxConcurrentRequests, "maxConcurrentRequests");
+ Ensure.Positive(maxAttemptsForOperation, "maxAttemptsForOperation");
+ Ensure.Nonnegative(maxReconnections, "maxReconnections");
+
+ return new EventStoreConnection(allowForwarding,
+ maxConcurrentRequests,
+ maxAttemptsForOperation,
+ maxReconnections,
+ logger);
}
- public void Connect(IPEndPoint tcpEndPoint, IPEndPoint httpEndPoint)
+ public void Connect(IPEndPoint tcpEndPoint, IPEndPoint httpEndPoint = null)
{
Ensure.NotNull(tcpEndPoint, "tcpEndPoint");
- Ensure.NotNull(httpEndPoint, "httpEndPoint");
+ var task = ConnectAsync(tcpEndPoint, httpEndPoint);
+ task.Wait();
+ }
- InitiateConnection(tcpEndPoint, httpEndPoint);
+ public Task ConnectAsync(IPEndPoint tcpEndPoint, IPEndPoint httpEndPoint = null)
+ {
+ Ensure.NotNull(tcpEndPoint, "tcpEndPoint");
+ return EstablishConnectionAsync(tcpEndPoint, httpEndPoint ?? new IPEndPoint(tcpEndPoint.Address, tcpEndPoint.Port + 1000));
}
- public void Connect(string clusterDns, int port = 30777)
+ public void Connect(string clusterDns, int maxAttempts = 10, int port = 30777)
{
Ensure.NotNullOrEmpty(clusterDns, "clusterDns");
+ Ensure.Positive(maxAttempts, "maxAttempts");
Ensure.Nonnegative(port, "port");
- var explorer = new ClusterExplorer(_allowForwarding, port);
- var resolve = explorer.Resolve(clusterDns);
- resolve.Wait();
+ var task = ConnectAsync(clusterDns, maxAttempts, port);
+ task.Wait();
+ }
+
+ public Task ConnectAsync(string clusterDns, int maxAttempts = 10, int port = 30777)
+ {
+ Ensure.NotNullOrEmpty(clusterDns, "clusterDns");
+ Ensure.Positive(maxAttempts, "maxAttempts");
+ Ensure.Nonnegative(port, "port");
- var endpointsPair = resolve.Result;
- if(!endpointsPair.HasValue)
- throw new CannotEstablishConnectionException("Failed to find node to connect");
+ var explorer = new ClusterExplorer(_allowForwarding, maxAttempts, port);
+ return explorer.Resolve(clusterDns)
+ .ContinueWith(t =>
+ {
+ var pair = t.Result;
+ if (!pair.HasValue)
+ throw new CannotEstablishConnectionException("Failed to find node to connect");
- InitiateConnection(endpointsPair.Value.TcpEndPoint, endpointsPair.Value.HttpEndPoint);
+ return EstablishConnectionAsync(pair.Value.TcpEndPoint, pair.Value.HttpEndPoint);
+ });
}
- private void InitiateConnection(IPEndPoint tcpEndPoint, IPEndPoint httpEndPoint)
+ private Task EstablishConnectionAsync(IPEndPoint tcpEndPoint, IPEndPoint httpEndPoint)
{
lock (_connectionLock)
{
@@ -170,6 +193,8 @@ private void InitiateConnection(IPEndPoint tcpEndPoint, IPEndPoint httpEndPoint)
_worker = new Thread(MainLoop) {IsBackground = true, Name = "Worker thread"};
_worker.Start();
+
+ return Tasks.CreateCompleted();
}
}
@@ -448,7 +473,7 @@ public Task SubscribeAsync(string stream, Action<RecordedEvent, Position> eventA
return _subscriptionsChannel.Subscribe(stream, eventAppeared, subscriptionDropped);
}
- public void Unsubscribe(string stream)
+ public Task UnsubscribeAsync(string stream)
{
Ensure.NotNullOrEmpty(stream, "stream");
EnsureActive();
@@ -456,6 +481,8 @@ public void Unsubscribe(string stream)
lock (_connectionLock)
_subscriptionsChannel.EnsureConnected(_tcpEndPoint);
_subscriptionsChannel.Unsubscribe(stream);
+
+ return Tasks.CreateCompleted();
}
public Task SubscribeToAllStreamsAsync(Action<RecordedEvent, Position> eventAppeared, Action subscriptionDropped)
@@ -469,13 +496,15 @@ public Task SubscribeToAllStreamsAsync(Action<RecordedEvent, Position> eventAppe
return _subscriptionsChannel.SubscribeToAllStreams(eventAppeared, subscriptionDropped);
}
- public void UnsubscribeFromAllStreams()
+ public Task UnsubscribeFromAllStreamsAsync()
{
EnsureActive();
lock (_connectionLock)
_subscriptionsChannel.EnsureConnected(_tcpEndPoint);
_subscriptionsChannel.UnsubscribeFromAllStreams();
+
+ return Tasks.CreateCompleted();
}
void IProjectionsManagement.Enable(string name)
View
30 src/EventStore/EventStore.Core.Tests/ClientAPI/AllEvents/read_all_events_backward_should.cs
@@ -54,8 +54,9 @@ public void TearDown()
public void return_empty_slice_if_asked_to_read_from_start()
{
const string stream = "read_all_events_backward_should_return_empty_slice_if_asked_to_read_from_start";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -69,8 +70,9 @@ public void return_empty_slice_if_asked_to_read_from_start()
[Test]
public void return_empty_slice_if_no_events_present()
{
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var all = new List<RecordedEvent>();
var position = Position.End;
AllEventsSlice slice;
@@ -89,8 +91,9 @@ public void return_empty_slice_if_no_events_present()
public void return_partial_slice_if_not_enough_events()
{
const string stream = "read_all_events_backward_should_return_partial_slice_if_not_enough_events";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -110,8 +113,9 @@ public void return_partial_slice_if_not_enough_events()
public void return_events_in_reversed_order_compared_to_written()
{
const string stream = "read_all_events_backward_should_return_events_in_reversed_order_compared_to_written";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var testEvents = Enumerable.Range(0, 5).Select(x => new TestEvent((x + 1).ToString())).ToArray();
var create = store.CreateStreamAsync(stream, new byte[0]);
@@ -131,8 +135,9 @@ public void return_events_in_reversed_order_compared_to_written()
public void read_stream_created_events_as_well()
{
const string stream = "read_all_events_backward_should_read_system_events_as_well";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
Assert.DoesNotThrow(create1.Wait);
@@ -151,8 +156,9 @@ public void read_stream_created_events_as_well()
public void be_able_to_read_all_one_by_one_and_return_empty_slice_at_last()
{
const string stream = "read_all_events_backward_should_be_able_to_read_all_one_by_one_and_return_empty_slice_at_last";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -179,8 +185,9 @@ public void be_able_to_read_all_one_by_one_and_return_empty_slice_at_last()
public void be_able_to_read_events_slice_at_time()
{
const string stream = "read_all_events_backward_should_be_able_to_read_events_slice_at_time";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -207,8 +214,9 @@ public void be_able_to_read_events_slice_at_time()
public void not_return_events_from_deleted_streams()
{
const string stream = "read_all_events_backward_should_not_return_events_from_deleted_streams";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
Assert.DoesNotThrow(create1.Wait);
@@ -245,8 +253,9 @@ public void not_return_events_from_deleted_streams()
public void not_return_stream_deleted_records()
{
const string stream = "read_all_events_backward_should_not_return_stream_deleted_records";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
Assert.DoesNotThrow(create1.Wait);
@@ -268,8 +277,9 @@ public void not_return_stream_deleted_records()
public void return_no_records_if_stream_created_than_deleted()
{
const string stream = "read_all_events_backward_should_return_no_records_if_stream_created_than_deleted";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
View
35 src/EventStore/EventStore.Core.Tests/ClientAPI/AllEvents/read_all_events_forward_should.cs
@@ -57,8 +57,9 @@ public void TearDown()
public void return_empty_slice_if_asked_to_read_from_end()
{
const string stream = "read_all_events_forward_should_return_empty_slice_if_asked_to_read_from_end";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -77,8 +78,9 @@ public void return_empty_slice_if_asked_to_read_from_end()
[Test]
public void return_empty_slice_if_no_events_present()
{
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var all = new List<RecordedEvent>();
var position = Position.Start;
AllEventsSlice slice;
@@ -97,8 +99,9 @@ public void return_empty_slice_if_no_events_present()
public void return_events_in_same_order_as_written()
{
const string stream = "read_all_events_forward_should_return_events_in_same_order_as_written";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var testEvents = Enumerable.Range(0, 5).Select(x => new TestEvent((x + 1).ToString())).ToArray();
var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
@@ -125,8 +128,9 @@ public void return_events_in_same_order_as_written()
public void read_stream_created_events_as_well()
{
const string stream = "read_all_events_forward_should_read_system_events_as_well";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
Assert.DoesNotThrow(create1.Wait);
@@ -145,8 +149,9 @@ public void read_stream_created_events_as_well()
public void be_able_to_read_all_one_by_one_and_return_empty_slice_at_last()
{
const string stream = "read_all_events_forward_should_be_able_to_read_all_one_by_one_and_return_empty_slice_at_last";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -173,8 +178,9 @@ public void be_able_to_read_all_one_by_one_and_return_empty_slice_at_last()
public void be_able_to_read_events_slice_at_time()
{
const string stream = "read_all_events_forward_should_be_able_to_read_events_slice_at_time";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -201,8 +207,9 @@ public void be_able_to_read_events_slice_at_time()
public void return_partial_slice_if_not_enough_events()
{
const string stream = "read_all_events_forward_should_return_partial_slice_if_not_enough_events";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -222,8 +229,9 @@ public void return_partial_slice_if_not_enough_events()
public void not_return_events_from_deleted_streams()
{
const string stream = "read_all_events_forward_should_not_return_events_from_deleted_streams";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
Assert.DoesNotThrow(create1.Wait);
@@ -260,8 +268,9 @@ public void not_return_events_from_deleted_streams()
public void not_return_stream_deleted_records()
{
const string stream = "read_all_events_forward_should_not_return_stream_deleted_records";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
Assert.DoesNotThrow(create1.Wait);
@@ -283,8 +292,9 @@ public void not_return_stream_deleted_records()
public void return_no_records_if_stream_created_than_deleted()
{
const string stream = "read_all_events_forward_should_return_no_records_if_stream_created_than_deleted";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
Assert.DoesNotThrow(create1.Wait);
@@ -309,8 +319,9 @@ public void return_no_records_if_stream_created_than_deleted()
public void recover_from_dropped_subscription_state_using_last_known_position()
{
const string stream = "read_all_events_forward_should_recover_from_dropped_subscription_state_using_last_known_position";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var catched = new List<RecordedEvent>();
Position lastKnonwPosition = null;
var dropped = new AutoResetEvent(false);
@@ -332,7 +343,7 @@ public void recover_from_dropped_subscription_state_using_last_known_position()
var write = store.AppendToStreamAsync(stream, ExpectedVersion.EmptyStream, testEvents);
Assert.That(write.Wait(Timeout));
- store.Unsubscribe(stream);
+ store.UnsubscribeAsync(stream);
Assert.That(dropped.WaitOne(Timeout));
var write2 = store.AppendToStreamAsync(stream, testEvents.Length, testEvents);
View
11 src/EventStore/EventStore.Core.Tests/ClientAPI/AllEvents/subscribe_to_all_should.cs
@@ -55,8 +55,9 @@ public void TearDown()
public void allow_multiple_subscriptions()
{
const string stream = "subscribe_to_all_should_allow_multiple_subscriptions";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var appeared = new CountdownEvent(2);
var dropped = new CountdownEvent(2);
@@ -76,8 +77,9 @@ public void allow_multiple_subscriptions()
[Test]
public void drop_all_global_subscribers_when_unsubscribe_from_all_called()
{
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var appeared = new CountdownEvent(1);
var dropped = new CountdownEvent(2);
@@ -87,7 +89,7 @@ public void drop_all_global_subscribers_when_unsubscribe_from_all_called()
store.SubscribeToAllStreamsAsync(eventAppeared, subscriptionDropped);
store.SubscribeToAllStreamsAsync(eventAppeared, subscriptionDropped);
- store.UnsubscribeFromAllStreams();
+ store.UnsubscribeFromAllStreamsAsync();
Assert.That(dropped.Wait(Timeout));
}
}
@@ -96,8 +98,9 @@ public void drop_all_global_subscribers_when_unsubscribe_from_all_called()
public void catch_created_and_deleted_events_as_well()
{
const string stream = "subscribe_to_all_should_catch_created_and_deleted_events_as_well";
- using (var store = new EventStoreConnection(Node.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(Node.TcpEndPoint);
var appeared = new CountdownEvent(2);
var dropped = new CountdownEvent(1);
View
30 src/EventStore/EventStore.Core.Tests/ClientAPI/append_to_stream.cs
@@ -41,8 +41,9 @@ internal class append_to_stream
public void should_create_stream_with_no_stream_exp_ver_on_first_write_if_does_not_exist()
{
const string stream = "should_create_stream_with_no_stream_exp_ver_on_first_write_if_does_not_exist";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var append = store.AppendToStreamAsync(stream, ExpectedVersion.NoStream, new[] {new TestEvent()});
Assert.DoesNotThrow(append.Wait);
@@ -57,8 +58,9 @@ public void should_create_stream_with_no_stream_exp_ver_on_first_write_if_does_n
public void should_create_stream_with_any_exp_ver_on_first_write_if_does_not_exist()
{
const string stream = "should_create_stream_with_any_exp_ver_on_first_write_if_does_not_exist";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var append = store.AppendToStreamAsync(stream, ExpectedVersion.Any, new[] { new TestEvent() });
Assert.DoesNotThrow(append.Wait);
@@ -73,8 +75,9 @@ public void should_create_stream_with_any_exp_ver_on_first_write_if_does_not_exi
public void should_fail_to_create_stream_with_wrong_exp_ver_on_first_write_if_does_not_exist()
{
const string stream = "should_fail_to_create_stream_with_wrong_exp_ver_on_first_write_if_does_not_exist";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var append = store.AppendToStreamAsync(stream, ExpectedVersion.EmptyStream, new[] { new TestEvent() });
Assert.That(() => append.Wait(), Throws.Exception.TypeOf<AggregateException>().With.InnerException.TypeOf<WrongExpectedVersionException>());
}
@@ -85,8 +88,9 @@ public void should_fail_to_create_stream_with_wrong_exp_ver_on_first_write_if_do
public void should_fail_writing_with_correct_exp_ver_to_deleted_stream()
{
const string stream = "should_fail_writing_with_correct_exp_ver_to_deleted_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -103,8 +107,9 @@ public void should_fail_writing_with_correct_exp_ver_to_deleted_stream()
public void should_fail_writing_with_any_exp_ver_to_deleted_stream()
{
const string stream = "should_fail_writing_with_any_exp_ver_to_deleted_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -121,8 +126,9 @@ public void should_fail_writing_with_any_exp_ver_to_deleted_stream()
public void should_fail_writing_with_invalid_exp_ver_to_deleted_stream()
{
const string stream = "should_fail_writing_with_invalid_exp_ver_to_deleted_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -139,8 +145,9 @@ public void should_fail_writing_with_invalid_exp_ver_to_deleted_stream()
public void should_append_with_correct_exp_ver_to_existing_stream()
{
const string stream = "should_append_with_correct_exp_ver_to_existing_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -154,8 +161,9 @@ public void should_append_with_correct_exp_ver_to_existing_stream()
public void should_append_with_any_exp_ver_to_existing_stream()
{
const string stream = "should_append_with_any_exp_ver_to_existing_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -169,8 +177,9 @@ public void should_append_with_any_exp_ver_to_existing_stream()
public void should_fail_appending_with_wrong_exp_ver_to_existing_stream()
{
const string stream = "should_fail_appending_with_wrong_exp_ver_to_existing_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -184,8 +193,9 @@ public void should_fail_appending_with_wrong_exp_ver_to_existing_stream()
public void can_append_multiple_events_at_once()
{
const string stream = "can_append_multiple_events_at_once";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
View
12 src/EventStore/EventStore.Core.Tests/ClientAPI/creating_stream.cs
@@ -40,8 +40,9 @@ internal class creating_stream
public void which_does_not_exist_should_be_successfull()
{
const string stream = "which_does_not_exist_should_be_successfull";
- using (var connection = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var connection = EventStoreConnection.Create())
{
+ connection.Connect(MiniNode.Instance.TcpEndPoint);
var create = connection.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
}
@@ -52,8 +53,9 @@ public void which_does_not_exist_should_be_successfull()
public void which_supposed_to_be_system_should_succees__but_on_your_own_risk()
{
const string stream = "$which_supposed_to_be_system_should_succees__but_on_your_own_risk";
- using (var connection = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var connection = EventStoreConnection.Create())
{
+ connection.Connect(MiniNode.Instance.TcpEndPoint);
var create = connection.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
}
@@ -64,8 +66,9 @@ public void which_supposed_to_be_system_should_succees__but_on_your_own_risk()
public void which_already_exists_should_fail()
{
const string stream = "which_already_exists_should_fail";
- using (var connection = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var connection = EventStoreConnection.Create())
{
+ connection.Connect(MiniNode.Instance.TcpEndPoint);
var initialCreate = connection.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(initialCreate.Wait);
@@ -80,8 +83,9 @@ public void which_already_exists_should_fail()
public void which_was_deleted_should_fail()
{
const string stream = "which_was_deleted_should_fail";
- using (var connection = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var connection = EventStoreConnection.Create())
{
+ connection.Connect(MiniNode.Instance.TcpEndPoint);
var create = connection.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
View
15 src/EventStore/EventStore.Core.Tests/ClientAPI/deleting_stream.cs
@@ -40,8 +40,9 @@ internal class deleting_stream
public void which_already_exists_should_success_when_passed_empty_stream_expected_version()
{
const string stream = "which_already_exists_should_success_when_passed_empty_stream_expected_version";
- using (var connection = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var connection = EventStoreConnection.Create())
{
+ connection.Connect(MiniNode.Instance.TcpEndPoint);
var create = connection.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -55,8 +56,9 @@ public void which_already_exists_should_success_when_passed_empty_stream_expecte
public void which_already_exists_should_success_when_passed_any_for_expected_version()
{
const string stream = "which_already_exists_should_success_when_passed_any_for_expected_version";
- using (var connection = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var connection = EventStoreConnection.Create())
{
+ connection.Connect(MiniNode.Instance.TcpEndPoint);
var create = connection.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -70,8 +72,9 @@ public void which_already_exists_should_success_when_passed_any_for_expected_ver
public void with_invalid_expected_version_should_fail()
{
const string stream = "with_invalid_expected_version_should_fail";
- using (var connection = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var connection = EventStoreConnection.Create())
{
+ connection.Connect(MiniNode.Instance.TcpEndPoint);
var create = connection.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -85,8 +88,9 @@ public void with_invalid_expected_version_should_fail()
public void which_does_not_exist_should_fail()
{
const string stream = "which_does_not_exist_should_fail";
- using (var connection = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var connection = EventStoreConnection.Create())
{
+ connection.Connect(MiniNode.Instance.TcpEndPoint);
var delete = connection.DeleteStreamAsync(stream, ExpectedVersion.Any);
Assert.Inconclusive();
//Assert.That(() => delete.Wait(), Throws.Exception.TypeOf<AggregateException>().With.InnerException.TypeOf<WrongExpectedVersionException>());
@@ -98,8 +102,9 @@ public void which_does_not_exist_should_fail()
public void which_was_allready_deleted_should_fail()
{
const string stream = "which_was_allready_deleted_should_fail";
- using (var connection = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var connection = EventStoreConnection.Create())
{
+ connection.Connect(MiniNode.Instance.TcpEndPoint);
var create = connection.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
View
30 src/EventStore/EventStore.Core.Tests/ClientAPI/read_event_stream_backward_should.cs
@@ -41,8 +41,9 @@ internal class read_event_stream_backward_should
public void throw_if_count_le_zero()
{
const string stream = "read_event_stream_backward_should_throw_if_count_le_zero";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
Assert.Throws<ArgumentOutOfRangeException>(() => store.ReadEventStreamBackwardAsync(stream, 0, 0));
}
}
@@ -52,8 +53,9 @@ public void throw_if_count_le_zero()
public void throw_if_no_stream()
{
const string stream = "read_event_stream_backward_should_throw_if_no_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var read = store.ReadEventStreamBackwardAsync(stream, StreamPosition.End, 1);
Assert.That(() => read.Wait(), Throws.Exception.TypeOf<AggregateException>().With.InnerException.TypeOf<StreamDoesNotExistException>());
}
@@ -64,8 +66,9 @@ public void throw_if_no_stream()
public void throw_if_stream_deleted()
{
const string stream = "read_event_stream_backward_should_throw_if_stream_deleted";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var delete = store.DeleteStreamAsync(stream, ExpectedVersion.EmptyStream);
@@ -81,8 +84,9 @@ public void throw_if_stream_deleted()
public void return_single_event_when_called_on_empty_stream()
{
const string stream = "read_event_stream_backward_should_return_single_event_when_called_on_empty_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -98,8 +102,9 @@ public void return_single_event_when_called_on_empty_stream()
public void return_partial_slice_if_no_enough_events_in_stream()
{
const string stream = "read_event_stream_backward_should_return_partial_slice_if_no_enough_events_in_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -119,8 +124,9 @@ public void return_partial_slice_if_no_enough_events_in_stream()
public void return_events_reversed_compared_to_written()
{
const string stream = "read_event_stream_backward_should_return_events_reversed_compared_to_written";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -140,8 +146,9 @@ public void return_events_reversed_compared_to_written()
public void be_able_to_read_single_event_from_arbitrary_position()
{
const string stream = "read_event_stream_backward_should_be_able_to_read_single_event_from_arbitrary_position";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -161,8 +168,9 @@ public void be_able_to_read_single_event_from_arbitrary_position()
public void be_able_to_read_first_event()
{
const string stream = "read_event_stream_backward_should_be_able_to_read_first_event";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -182,8 +190,9 @@ public void be_able_to_read_first_event()
public void be_able_to_read_last_event()
{
const string stream = "read_event_stream_backward_should_be_able_to_read_last_event";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -203,8 +212,9 @@ public void be_able_to_read_last_event()
public void be_able_to_read_slice_from_arbitrary_position()
{
const string stream = "read_event_stream_backward_should_be_able_to_read_slice_from_arbitrary_position";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
View
36 src/EventStore/EventStore.Core.Tests/ClientAPI/read_event_stream_forward_should.cs
@@ -40,8 +40,9 @@ internal class read_event_stream_forward_should
public void throw_if_count_le_zero()
{
const string stream = "read_event_stream_forward_should_throw_if_count_le_zero";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
Assert.Throws<ArgumentOutOfRangeException>(() => store.ReadEventStreamForwardAsync(stream, 0, 0));
}
}
@@ -51,8 +52,9 @@ public void throw_if_count_le_zero()
public void throw_if_start_lt_zero()
{
const string stream = "read_event_stream_forward_should_throw_if_start_lt_zero";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
Assert.Throws<ArgumentOutOfRangeException>(() => store.ReadEventStreamForwardAsync(stream, -1, 1));
}
}
@@ -62,8 +64,9 @@ public void throw_if_start_lt_zero()
public void throw_if_no_stream()
{
const string stream = "read_event_stream_forward_should_throw_if_no_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var read = store.ReadEventStreamForwardAsync(stream, 0, 1);
Assert.That(() => read.Wait(), Throws.Exception.TypeOf<AggregateException>().With.InnerException.TypeOf<StreamDoesNotExistException>());
}
@@ -74,8 +77,9 @@ public void throw_if_no_stream()
public void throw_if_stream_deleted()
{
const string stream = "read_event_stream_forward_should_throw_if_stream_deleted";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var delete = store.DeleteStreamAsync(stream, ExpectedVersion.EmptyStream);
@@ -91,8 +95,9 @@ public void throw_if_stream_deleted()
public void return_single_event_when_called_on_empty_stream()
{
const string stream = "read_event_stream_forward_should_return_single_event_when_called_on_empty_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -108,8 +113,9 @@ public void return_single_event_when_called_on_empty_stream()
public void return_empty_slice_when_called_on_empty_stream_starting_at_position_1()
{
const string stream = "read_event_stream_forward_should_return_empty_slice_when_called_on_empty_stream_starting_at_position_1";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -125,8 +131,9 @@ public void return_empty_slice_when_called_on_empty_stream_starting_at_position_
public void return_empty_slice_when_called_on_non_existing_range()
{
const string stream = "read_event_stream_forward_should_return_empty_slice_when_called_on_non_existing_range";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -145,8 +152,9 @@ public void return_empty_slice_when_called_on_non_existing_range()
public void return_partial_slice_if_no_enough_events_in_stream()
{
const string stream = "read_event_stream_forward_should_return_partial_slice_if_no_enough_events_in_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -165,8 +173,9 @@ public void return_partial_slice_if_no_enough_events_in_stream()
public void return_partial_slice_when_got_int_max_value_as_maxcount()
{
const string stream = "read_event_stream_forward_should_return_partial_slice_when_got_int_max_value_as_maxcount";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -185,8 +194,9 @@ public void return_partial_slice_when_got_int_max_value_as_maxcount()
public void return_events_in_same_order_as_written()
{
const string stream = "read_event_stream_forward_should_return_events_in_same_order_as_written";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -206,8 +216,9 @@ public void return_events_in_same_order_as_written()
public void be_able_to_read_single_event_from_arbitrary_position()
{
const string stream = "read_event_stream_forward_should_be_able_to_read_from_arbitrary_position";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
@@ -227,8 +238,9 @@ public void be_able_to_read_single_event_from_arbitrary_position()
public void be_able_to_read_slice_from_arbitrary_position()
{
const string stream = "read_event_stream_forward_should_be_able_to_read_slice_from_arbitrary_position";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var create = store.CreateStreamAsync(stream, new byte[0]);
Assert.DoesNotThrow(create.Wait);
View
20 src/EventStore/EventStore.Core.Tests/ClientAPI/subscribe_should.cs
@@ -41,8 +41,9 @@ internal class subscribe_should
public void be_able_to_subscribe_to_non_existing_stream_and_then_catch_created_event()
{
const string stream = "subscribe_should_be_able_to_subscribe_to_non_existing_stream_and_then_catch_created_event";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var appeared = new CountdownEvent(1);
var dropped = new CountdownEvent(1);
@@ -62,8 +63,9 @@ public void be_able_to_subscribe_to_non_existing_stream_and_then_catch_created_e
public void allow_multiple_subscriptions_to_same_stream()
{
const string stream = "subscribe_should_allow_multiple_subscriptions_to_same_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var appeared = new CountdownEvent(2);
var dropped = new CountdownEvent(2);
@@ -84,8 +86,9 @@ public void allow_multiple_subscriptions_to_same_stream()
public void call_dropped_callback_after_unsubscribe_method_call()
{
const string stream = "subscribe_should_call_dropped_callback_after_unsubscribe_method_call";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var appeared = new CountdownEvent(1);
var dropped = new CountdownEvent(1);
@@ -95,7 +98,7 @@ public void call_dropped_callback_after_unsubscribe_method_call()
store.SubscribeAsync(stream, eventAppeared, subscriptionDropped);
Assert.That(!appeared.Wait(50));
- store.Unsubscribe(stream);
+ store.UnsubscribeAsync(stream);
Assert.That(dropped.Wait(Timeout));
}
}
@@ -104,8 +107,9 @@ public void call_dropped_callback_after_unsubscribe_method_call()
public void subscribe_to_deleted_stream_as_well_but_never_invoke_user_callbacks()
{
const string stream = "subscribe_should_subscribe_to_deleted_stream_as_well_but_never_invoke_user_callbacks";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var appeared = new CountdownEvent(1);
var dropped = new CountdownEvent(1);
@@ -127,8 +131,9 @@ public void subscribe_to_deleted_stream_as_well_but_never_invoke_user_callbacks(
public void not_call_dropped_if_stream_was_deleted()
{
const string stream = "subscribe_should_not_call_dropped_if_stream_was_deleted";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var appeared = new CountdownEvent(1);
var dropped = new CountdownEvent(1);
@@ -154,8 +159,9 @@ public void not_call_dropped_if_stream_was_deleted()
public void catch_created_and_deleted_events_as_well()
{
const string stream = "subscribe_should_catch_created_and_deleted_events_as_well";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var appeared = new CountdownEvent(2);
var dropped = new CountdownEvent(1);
View
53 src/EventStore/EventStore.Core.Tests/ClientAPI/transaction.cs
@@ -45,8 +45,9 @@ internal class transaction
public void should_start_on_non_existing_stream_with_correct_exp_ver_and_create_stream_on_commit()
{
const string stream = "should_start_on_non_existing_stream_with_correct_exp_ver_and_create_stream_on_commit";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var start = store.StartTransactionAsync(stream, ExpectedVersion.NoStream);
Assert.DoesNotThrow(start.Wait);
var write = store.TransactionalWriteAsync(start.Result.TransactionId, stream, new[] {new TestEvent()});
@@ -61,8 +62,9 @@ public void should_start_on_non_existing_stream_with_correct_exp_ver_and_create_
public void should_start_on_non_existing_stream_with_exp_ver_any_and_create_stream_on_commit()
{
const string stream = "should_start_on_non_existing_stream_with_exp_ver_any_and_create_stream_on_commit";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var start = store.StartTransactionAsync(stream, ExpectedVersion.Any);
Assert.DoesNotThrow(start.Wait);
var write = store.TransactionalWriteAsync(start.Result.TransactionId, stream, new[] {new TestEvent()});
@@ -77,8 +79,9 @@ public void should_start_on_non_existing_stream_with_exp_ver_any_and_create_stre
public void should_fail_to_commit_non_existing_stream_with_wrong_exp_ver()
{
const string stream = "should_fail_to_commit_non_existing_stream_with_wrong_exp_ver";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var start = store.StartTransactionAsync(stream, ExpectedVersion.EmptyStream);
Assert.DoesNotThrow(start.Wait);
var write = store.TransactionalWriteAsync(start.Result.TransactionId, stream, new[] { new TestEvent() });
@@ -93,8 +96,9 @@ public void should_fail_to_commit_non_existing_stream_with_wrong_exp_ver()
public void should_create_stream_if_commits_no_events_to_empty_stream()
{
const string stream = "should_create_stream_if_commits_no_events_to_empty_stream";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var start = store.StartTransactionAsync(stream, ExpectedVersion.NoStream);
Assert.DoesNotThrow(start.Wait);
var commit = store.CommitTransactionAsync(start.Result.TransactionId, start.Result.Stream);
@@ -112,8 +116,9 @@ public void should_create_stream_if_commits_no_events_to_empty_stream()
public void should_validate_expectations_on_commit()
{
const string stream = "should_validate_expectations_on_commit";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var start = store.StartTransactionAsync(stream, 100500);
Assert.DoesNotThrow(start.Wait);
var write = store.TransactionalWriteAsync(start.Result.TransactionId, stream, new[] { new TestEvent() });
@@ -136,14 +141,18 @@ public void should_commit_when_writing_with_exp_ver_any_even_while_somene_is_wri
var totalPlainWrites = 500;
//excplicitly creating stream
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
+ {
+ store.Connect(MiniNode.Instance.TcpEndPoint);
store.CreateStream(stream, new byte[0]);
+ }
//500 events during transaction
ThreadPool.QueueUserWorkItem(_ =>
{
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var transaction = store.StartTransaction(stream, ExpectedVersion.Any);
var writes = new List<Task>();
for (int i = 0; i < totalTranWrites; i++)
@@ -166,8 +175,9 @@ public void should_commit_when_writing_with_exp_ver_any_even_while_somene_is_wri
//500 events to same stream in parallel
ThreadPool.QueueUserWorkItem(_ =>
{
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var writes = new List<Task>();
for (int i = 0; i < totalPlainWrites; i++)
{
@@ -189,8 +199,9 @@ public void should_commit_when_writing_with_exp_ver_any_even_while_somene_is_wri
writesToSameStreamCompleted.WaitOne();
//check all written
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var slice = store.ReadEventStreamForward(stream, 0, totalTranWrites + totalPlainWrites + 1);
Assert.That(slice.Events.Length, Is.EqualTo(totalTranWrites + totalPlainWrites + 1));
@@ -204,11 +215,15 @@ public void should_commit_when_writing_with_exp_ver_any_even_while_somene_is_wri
public void should_fail_to_commit_if_started_with_correct_ver_but_committing_with_bad()
{
const string stream = "should_fail_to_commit_if_started_with_correct_ver_but_committing_with_bad";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
+ {
+ store.Connect(MiniNode.Instance.TcpEndPoint);
store.CreateStream(stream, new byte[0]);
+ }
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var start = store.StartTransactionAsync(stream, ExpectedVersion.EmptyStream);
Assert.DoesNotThrow(start.Wait);
@@ -228,11 +243,15 @@ public void should_fail_to_commit_if_started_with_correct_ver_but_committing_wit
public void should_not_fail_to_commit_if_started_with_wrong_ver_but_committing_with_correct_ver()
{
const string stream = "should_not_fail_to_commit_if_started_with_wrong_ver_but_committing_with_correct_ver";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
+ {
+ store.Connect(MiniNode.Instance.TcpEndPoint);
store.CreateStream(stream, new byte[0]);
+ }
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var start = store.StartTransactionAsync(stream, 1);
Assert.DoesNotThrow(start.Wait);
@@ -252,11 +271,15 @@ public void should_not_fail_to_commit_if_started_with_wrong_ver_but_committing_w
public void should_fail_to_commit_if_started_with_correct_ver_but_on_commit_stream_was_deleted()
{
const string stream = "should_fail_to_commit_if_started_with_correct_ver_but_on_commit_stream_was_deleted";
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
+ {
+ store.Connect(MiniNode.Instance.TcpEndPoint);
store.CreateStream(stream, new byte[0]);
+ }
- using (var store = new EventStoreConnection(MiniNode.Instance.TcpEndPoint))
+ using (var store = EventStoreConnection.Create())
{
+ store.Connect(MiniNode.Instance.TcpEndPoint);
var start = store.StartTransactionAsync(stream, ExpectedVersion.EmptyStream);
Assert.DoesNotThrow(start.Wait);
View
3  src/EventStore/EventStore.TestClient/Commands/RunTestScenarios/ScenarioBase.cs
@@ -130,7 +130,8 @@ public void Run()
{
for (int i = 0; i < Connections; ++i)
{
- _connections[i] = new EventStoreConnection(_tcpEndPoint, maxConcurrentRequests:MaxConcurrentRequests, logger: ApiLogger);
+ _connections[i] = EventStoreConnection.Create(maxConcurrentRequests:MaxConcurrentRequests, logger: ApiLogger);
+ _connections[i].Connect(_tcpEndPoint);
}
RunInternal();
}
Please sign in to comment.
Something went wrong with that request. Please try again.