Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent reads of events that have not been replicated #1448

Merged
merged 1 commit into from
Dec 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using EventStore.ClientAPI;
using EventStore.Core.Tests.ClientAPI.Helpers;
using EventStore.Core.Tests.Helpers;
using EventStore.Core.Tests.TransactionLog;
using NUnit.Framework;
using EventStore.Common.Utils;
using EventStore.Core.Bus;
Expand Down Expand Up @@ -70,16 +71,7 @@ public override void TestFixtureSetUp()
ChaserCheckpoint = new MemoryMappedFileCheckpoint(chaserCheckFilename, Checkpoint.Chaser, cached: true);
}

Db = new TFChunkDb(new TFChunkDbConfig(dbPath,
new VersionedPatternFileNamingStrategy(dbPath, "chunk-"),
TFConsts.ChunkSize,
0,
WriterCheckpoint,
ChaserCheckpoint,
new InMemoryCheckpoint(-1),
new InMemoryCheckpoint(-1),
inMemDb: false));

Db = new TFChunkDb(TFChunkDbConfigHelper.Create(dbPath, WriterCheckpoint, ChaserCheckpoint, TFConsts.ChunkSize));
Db.Open();

// create DB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ internal class FakeEventStoreConnection : IEventStoreConnection
private Func<Position, int, bool, UserCredentials, Task<AllEventsSlice>> _readAllEventsForwardAsync;
private Func<string, long, int, Task<StreamEventsSlice>> _readStreamEventsForwardAsync;
private Func<string, Func<EventStoreSubscription, ResolvedEvent, Task>, Action<EventStoreSubscription, SubscriptionDropReason, Exception>, Task<EventStoreSubscription>> _subscribeToStreamAsync;
private Func<bool, Func<EventStoreSubscription, ResolvedEvent, Task>, Action<EventStoreSubscription, SubscriptionDropReason, Exception>, Task<EventStoreSubscription>> _subscribeToAllAsync;

public void Dispose()
{
Expand Down Expand Up @@ -568,10 +569,15 @@ public EventStoreStreamCatchUpSubscription SubscribeToStreamFrom(
throw new NotImplementedException();
}

public void HandleSubscribeToAllAsync(Func<bool, Func<EventStoreSubscription, ResolvedEvent, Task>, Action<EventStoreSubscription, SubscriptionDropReason, Exception>, Task<EventStoreSubscription>> callback)
{
_subscribeToAllAsync = callback;
}

public Task<EventStoreSubscription> SubscribeToAllAsync(bool resolveLinkTos, Func<EventStoreSubscription, ResolvedEvent, Task> eventAppeared, Action<EventStoreSubscription, SubscriptionDropReason, Exception> subscriptionDropped = null,
UserCredentials userCredentials = null)
{
throw new NotImplementedException();
return _subscribeToAllAsync(resolveLinkTos, eventAppeared, subscriptionDropped);
}

public EventStorePersistentSubscriptionBase ConnectToPersistentSubscription(string stream, string groupName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,8 +786,6 @@ public class connect_to_persistent_subscription_with_retries : SpecificationWith

protected override void Given()
{


_conn.CreatePersistentSubscriptionAsync(_stream, _group, _settings,
DefaultData.AdminCredentials).Wait();
_conn.ConnectToPersistentSubscription(
Expand All @@ -809,9 +807,9 @@ private Task HandleEvent(EventStorePersistentSubscriptionBase sub, ResolvedEvent
{
if (retryCount > 4)
{
_resetEvent.Set();
_retryCount = retryCount;
sub.Acknowledge(resolvedEvent);
_resetEvent.Set();
}
else
{
Expand Down
30 changes: 24 additions & 6 deletions src/EventStore.Core.Tests/EventStore.Core.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -367,15 +367,15 @@
<Compile Include="Services\PersistentSubscription\PersistentSubscriptionTests.cs" />
<Compile Include="Services\PersistentSubscription\PinnedConsumerStrategyTests.cs" />
<Compile Include="Services\PersistentSubscription\StreamBufferTests.cs" />
<Compile Include="Services\Replication\DeleteStream\when_delete_stream_gets_commit_timeout_after_commit.cs" />
<Compile Include="Services\Replication\DeleteStream\when_delete_stream_gets_commit_timeout_after_commit_replicated.cs" />
<Compile Include="Authentication\when_handling_multiple_requests_with_reset_password_cache_in_between.cs" />
<Compile Include="Authentication\when_handling_multiple_requests_with_the_same_correct_user_name_and_password.cs" />
<Compile Include="Services\Replication\DeleteStream\when_delete_stream_gets_prepare_timeout_after_prepares.cs" />
<Compile Include="Services\Replication\DeleteStream\when_delete_stream_completes_successfully.cs" />
<Compile Include="Services\Replication\DeleteStream\when_creating_delete_stream_request_manager.cs" />
<Compile Include="Services\Replication\DeleteStream\when_delete_stream_gets_already_committed.cs" />
<Compile Include="Services\Replication\DeleteStream\when_delete_stream_gets_commit_timeout_before_commit_stage.cs" />
<Compile Include="Services\Replication\DeleteStream\when_delete_stream_gets_commit_timeout_before_final_commit.cs" />
<Compile Include="Services\Replication\DeleteStream\when_delete_stream_gets_commit_timeout_before_commit_replicated.cs" />
<Compile Include="Services\Replication\DeleteStream\when_delete_stream_gets_prepare_timeout_before_prepares.cs" />
<Compile Include="Services\Replication\DeleteStream\when_delete_stream_gets_stream_deleted.cs" />
<Compile Include="Services\Replication\FakeEnvelope.cs" />
Expand All @@ -384,19 +384,16 @@
<Compile Include="Services\Replication\TransactionCommit\when_transaction_commit_completes_successfully.cs" />
<Compile Include="Services\Replication\TransactionCommit\when_transaction_commit_gets_already_committed.cs" />
<Compile Include="Services\Replication\TransactionCommit\when_transaction_commit_gets_commit_timeout_before_commit_stage.cs" />
<Compile Include="Services\Replication\TransactionCommit\when_transaction_commit_gets_commit_timeout_before_final_commit.cs" />
<Compile Include="Services\Replication\TransactionCommit\when_transaction_commit_gets_prepare_timeout_after_prepares.cs" />
<Compile Include="Services\Replication\TransactionCommit\when_transaction_commit_gets_prepare_timeout_before_prepares.cs" />
<Compile Include="Services\Replication\TransactionCommit\when_transaction_commit_gets_stream_deleted.cs" />
<Compile Include="Services\Replication\WriteStream\when_write_stream_gets_commit_timeout_after_commit.cs" />
<Compile Include="Services\Replication\WriteStream\when_write_stream_gets_commit_timeout_after_commit_replicated.cs" />
<Compile Include="Services\Replication\WriteStream\when_creating_write_stream_request_manager.cs" />
<Compile Include="Services\Replication\WriteStream\when_write_stream_gets_stream_deleted.cs" />
<Compile Include="Services\Replication\WriteStream\when_write_stream_gets_already_committed.cs" />
<Compile Include="Services\Replication\WriteStream\when_write_stream_gets_prepare_timeout_after_prepares.cs" />
<Compile Include="Services\Replication\WriteStream\when_write_stream_completes_successfully.cs" />
<Compile Include="Services\Replication\WriteStream\when_master_has_not_acked.cs" />
<Compile Include="Services\Replication\WriteStream\when_write_stream_gets_commit_timeout_before_commit_stage.cs" />
<Compile Include="Services\Replication\WriteStream\when_write_stream_gets_commit_timeout_before_final_commit.cs" />
<Compile Include="Services\Replication\WriteStream\when_write_stream_gets_prepare_timeout_before_prepares.cs" />
<Compile Include="Services\Storage\AllReader\when_a_single_write_before_the_transaction_is_present.cs" />
<Compile Include="Services\Storage\AllReader\when_multiple_single_writes_are_after_transaction_end_but_before_commit_is_present.cs" />
Expand Down Expand Up @@ -629,13 +626,34 @@
<Compile Include="Services\Transport\Tcp\TcpClientDispatcherTests.cs" />
<Compile Include="Services\Storage\Scavenge\when_stream_is_softdeleted_with_mixed_log_record_version_0_and_version_1.cs" />
<Compile Include="ClientAPI\ExpectedVersion64Bit\read_stream_with_link_to_event_with_event_number_greater_than_int_maxvalue.cs" />
<Compile Include="TransactionLog\TFChunkDbConfigHelper.cs" />
<Compile Include="Index\IndexV1\when_a_ptable_is_loaded_from_disk.cs" />
<Compile Include="Index\IndexV4\ptable_midpoint_calculations_should.cs" />
<Compile Include="Index\IndexV4\when_merging_ptables_with_entries_to_nonexisting_record.cs" />
<Compile Include="Index\IndexV4\when_merging_ptables_vx_to_v4.cs" />
<Compile Include="Index\IndexV4\opening_a_ptable_with_more_than_32bits_of_records.cs" />
<Compile Include="Index\IndexV1\corrupt_index_should.cs" />
<Compile Include="Index\IndexV1\table_index_with_corrupt_index_entries_should.cs" />
<Compile Include="Services\Replication\CommitReplication\when_3_node_cluster_receives_1_commit_ack.cs" />
<Compile Include="Services\Replication\CommitReplication\when_3_node_cluster_receives_2_commit_acks.cs" />
<Compile Include="Services\Replication\CommitReplication\when_3_node_cluster_receives_2_commit_acks_for_positions_lower_than_checkpoint.cs" />
<Compile Include="Services\Replication\CommitReplication\when_3_node_cluster_receives_multiple_acks_for_different_positions.cs" />
<Compile Include="Services\Replication\ReplicationTestHelper.cs" />
<Compile Include="Services\Replication\ReadStream\when_reading_an_event_from_a_single_node.cs" />
<Compile Include="Services\Replication\ReadStream\when_reading_an_event_committed_on_master_and_on_slaves.cs" />
<Compile Include="Services\Replication\ReadStream\when_reading_events_from_cluster_with_replication_checkpoint_not_set.cs" />
<Compile Include="Services\Replication\Subscriptions\TestSubscription.cs" />
<Compile Include="Services\Replication\Subscriptions\when_subscribed_to_stream_on_master_and_event_is_replicated_to_slaves.cs" />
<Compile Include="TransactionLog\when_reading_a_single_record.cs" />
<Compile Include="Services\Storage\AllReader\when_reading_all_with_replication_checkpoint_set.cs" />
<Compile Include="Services\Replication\CommitReplication\when_single_node_receives_commit_ack.cs" />
<Compile Include="Services\Replication\CommitReplication\when_slave_node_in_3_node_cluster_receives_commit_ack.cs" />
<Compile Include="Services\Replication\CommitReplication\when_master_node_in_3_node_cluster_with_outstanding_commit_acks_becomes_unknown.cs" />
<Compile Include="Services\Replication\CommitReplication\when_3_node_cluster_receives_multiple_acks_for_different_positions_out_of_order.cs" />
<Compile Include="Services\Replication\CommitReplication\when_3_node_cluster_receives_multiple_acks_for_same_positions.cs" />
<Compile Include="Services\Replication\CommitReplication\with_index_committer_service.cs" />
<Compile Include="Services\Replication\CommitReplication\when_single_node_cluster_receives_commit_ack_for_multiple_prepares.cs" />
<Compile Include="Services\Replication\CommitReplication\when_constructing_index_committer_service.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\EventStore.ClientAPI.Embedded\EventStore.ClientAPI.Embedded.csproj">
Expand Down
3 changes: 2 additions & 1 deletion src/EventStore.Core.Tests/Helpers/MiniClusterNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ private TFChunkDbConfig CreateDbConfig(int chunkSize, string dbPath, long chunks
ICheckpoint chaserChk;
ICheckpoint epochChk;
ICheckpoint truncateChk;
ICheckpoint replicationCheckpoint = new InMemoryCheckpoint(-1);
if (inMemDb)
{
writerChk = new InMemoryCheckpoint(Checkpoint.Writer);
Expand Down Expand Up @@ -221,7 +222,7 @@ private TFChunkDbConfig CreateDbConfig(int chunkSize, string dbPath, long chunks
}
var nodeConfig = new TFChunkDbConfig(
dbPath, new VersionedPatternFileNamingStrategy(dbPath, "chunk-"), chunkSize, chunksCacheSize, writerChk,
chaserChk, epochChk, truncateChk, inMemDb);
chaserChk, epochChk, truncateChk, replicationCheckpoint, inMemDb);
return nodeConfig;
}
}
Expand Down
74 changes: 56 additions & 18 deletions src/EventStore.Core.Tests/Http/PersistentSubscription/parked.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
using EventStore.ClientAPI.Common;
using EventStore.Core.Data;
using System.Threading.Tasks;
using EventStore.Core.Bus;
using EventStore.Core.Messages;

namespace EventStore.Core.Tests.Http.PersistentSubscription
{
Expand Down Expand Up @@ -65,14 +67,26 @@ public void should_have_parked_the_event()
class when_replaying_parked_message : with_subscription_having_events
{
private string _nackLink;
private TaskCompletionSource<EventStore.ClientAPI.ResolvedEvent> _eventParked =
new TaskCompletionSource<EventStore.ClientAPI.ResolvedEvent>();
private Guid _eventIdToPark;
private Guid _receivedEventId;

private string _subscriptionParkedStream;
private Guid _writeCorrelationId;
private ManualResetEvent _eventParked = new ManualResetEvent(false);

protected override void Given()
{
_connection.Close();
_connection.Dispose();
NumberOfEventsToCreate = 1;
base.Given();

_subscriptionParkedStream = "$persistentsubscription-" + TestStream.Substring(9) + "::" + GroupName + "-parked";

// Subscribe to the writes to ensure the parked message has been written
_node.Node.MainBus.Subscribe(new AdHocHandler<StorageMessage.WritePrepares>(Handle));
_node.Node.MainBus.Subscribe(new AdHocHandler<StorageMessage.CommitReplicated>(Handle));

var json = GetJson2<JObject>(
SubscriptionPath + "/1", "embed=rich",
ContentType.CompetingJson,
Expand All @@ -89,32 +103,56 @@ protected override void Given()
Assert.AreEqual(HttpStatusCode.Accepted, response.StatusCode);
}

private void Handle(StorageMessage.WritePrepares msg)
{
if (msg.EventStreamId == _subscriptionParkedStream)
{
_writeCorrelationId = msg.CorrelationId;
}
}

private void Handle(StorageMessage.CommitReplicated msg)
{
if (msg.CorrelationId == _writeCorrelationId)
{
_eventParked.Set();
}
}

protected override void When()
{
_connection.ConnectToPersistentSubscriptionAsync(TestStreamName, GroupName, (x, y) =>
if(!_eventParked.WaitOne(TimeSpan.FromSeconds(10)))
{
_eventParked.SetResult(y);
return Task.CompletedTask;
},
(x, y, z) => { },
DefaultData.AdminCredentials).Wait();
Assert.Fail("Timed out waiting for event to be written to the parked stream");
}

//Replayed parked messages
var response = MakePost(SubscriptionPath + "/replayParked", _admin);

Assert.AreEqual(HttpStatusCode.OK, response.StatusCode);

for (var i = 0; i < 10; i++)
{
var json = GetJson2<JObject>(
SubscriptionPath + "/1", "embed=rich",
ContentType.CompetingJson,
_admin);

Assert.AreEqual(HttpStatusCode.OK, _lastResponse.StatusCode);

var entries = json != null ? json["entries"].ToList() : new List<JToken>();
if (entries.Count != 0)
{
_receivedEventId = Guid.Parse(entries[0]["eventId"].ToString());
break;
}
Console.WriteLine("Received no entries. Attempt {0} of 10", i + 1);
Thread.Sleep(TimeSpan.FromSeconds(1));
}
}

[Test]
public async Task should_have_replayed_the_parked_event()
public void should_receive_the_replayed_event()
{
var completedTask = await Task.WhenAny(_eventParked.Task, Task.Delay(TimeSpan.FromSeconds(5)));
if(completedTask != _eventParked.Task)
{
Assert.Fail("Timed out waiting for parked event");
}
var res = _eventParked.Task.Result;
Assert.AreEqual(_eventIdToPark, res.Event.EventId);
Assert.AreEqual(_eventIdToPark, _receivedEventId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using EventStore.Core.Tests.Helpers;
using NUnit.Framework;
using System.Collections.Generic;
using System.Linq;

namespace EventStore.Core.Tests.Integration
{
Expand Down Expand Up @@ -132,5 +133,15 @@ protected static void WaitIdle()
{
QueueStatsCollector.WaitIdle();
}

protected MiniClusterNode GetMaster()
{
return _nodes.First(x => x.NodeState == Data.VNodeState.Master);
}

protected MiniClusterNode[] GetSlaves()
{
return _nodes.Where(x => x.NodeState != Data.VNodeState.Master).ToArray();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using System;
using EventStore.Core.Messages;
using NUnit.Framework;

namespace EventStore.Core.Tests.Services.Replication.CommitReplication
{
[TestFixture]
public class when_3_node_cluster_receives_1_commit_ack : with_index_committer_service
{
private long _logPosition = 4000;

public override void When()
{
BecomeMaster();
AddPendingPrepare(_logPosition);
_service.Handle(new StorageMessage.CommitAck(Guid.NewGuid(), _logPosition, _logPosition, 0, 0, true));
}

[Test]
public void replication_checkpoint_should_not_be_updated()
{
Assert.AreEqual(0, _replicationCheckpoint.ReadNonFlushed());
}

[Test]
public void commit_replicated_message_should_not_be_sent()
{
Assert.AreEqual(0, _handledMessages.Count);
}

[Test]
public void index_should_not_have_been_updated()
{
Assert.AreEqual(0, _indexCommitter.CommittedPrepares.Count);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using System;
using System.Threading;
using EventStore.Core.Bus;
using EventStore.Core.Messages;
using NUnit.Framework;

namespace EventStore.Core.Tests.Services.Replication.CommitReplication
{
[TestFixture]
public class when_3_node_cluster_receives_2_commit_acks : with_index_committer_service
{
private CountdownEvent _eventsReplicated = new CountdownEvent(1);

private Guid _correlationId = Guid.NewGuid();
private long _logPosition = 4000;

public override void When()
{
_publisher.Subscribe(new AdHocHandler<StorageMessage.CommitReplicated>(m => _eventsReplicated.Signal()));
BecomeMaster();
AddPendingPrepare(_logPosition);
_service.Handle(new StorageMessage.CommitAck(_correlationId, _logPosition, _logPosition, 0, 0, true));
_service.Handle(new StorageMessage.CommitAck(_correlationId, _logPosition, _logPosition, 0, 0));

if(!_eventsReplicated.Wait(TimeSpan.FromSeconds(_timeoutSeconds)))
{
Assert.Fail("Timed out waiting for commit replicated messages to be published");
}
}

[Test]
public void replication_checkpoint_should_have_been_updated()
{
Assert.AreEqual(_logPosition, _replicationCheckpoint.ReadNonFlushed());
}

[Test]
public void commit_replicated_message_should_have_been_published()
{
Assert.AreEqual(1, _handledMessages.Count);
Assert.AreEqual(_logPosition, _handledMessages[0].TransactionPosition);
}

[Test]
public void index_should_have_been_updated()
{
Assert.AreEqual(1, _indexCommitter.CommittedPrepares.Count);
Assert.AreEqual(_logPosition, _indexCommitter.CommittedPrepares[0].LogPosition);
}
}
}
Loading