Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StreamEventReader Fix (no InvalidOperationException) #1774

Merged
merged 4 commits into from Dec 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/EventStore.ClientAPI.Embedded/EmbeddedVNodeBuilder.cs
Expand Up @@ -39,7 +39,7 @@ public static EmbeddedVNodeBuilder AsClusterMember(int clusterSize)
protected override void SetUpProjectionsIfNeeded()
{
_subsystems.Add(new ProjectionsSubsystem(_projectionsThreads, _projectionType,
_startStandardProjections, _projectionsQueryExpiry));
_startStandardProjections, _projectionsQueryExpiry, _faultOutOfOrderProjections));
}
}
}
3 changes: 3 additions & 0 deletions src/EventStore.ClusterNode/ClusterNodeOptions.cs
Expand Up @@ -152,6 +152,8 @@ public class ClusterNodeOptions : IOptions
public int WorkerThreads { get; set; }
[ArgDescription(Opts.ProjectionsQueryExpiryDescr, Opts.ProjectionsGroup)]
public int ProjectionsQueryExpiry { get; set; }
[ArgDescription(Opts.FaultOutOfOrderProjectionsDescr, Opts.ProjectionsGroup)]
public bool FaultOutOfOrderProjections { get; set; }

[ArgDescription(Opts.IntHttpPrefixesDescr, Opts.InterfacesGroup)]
public string[] IntHttpPrefixes { get; set; }
Expand Down Expand Up @@ -283,6 +285,7 @@ public ClusterNodeOptions()
SkipDbVerify = Opts.SkipDbVerifyDefault;
RunProjections = Opts.RunProjectionsDefault;
ProjectionThreads = Opts.ProjectionThreadsDefault;
FaultOutOfOrderProjections = Opts.FaultOutOfOrderProjectionsDefault;
WorkerThreads = Opts.WorkerThreadsDefault;
BetterOrdering = Opts.BetterOrderingDefault;

Expand Down
2 changes: 1 addition & 1 deletion src/EventStore.ClusterNode/ClusterVNodeBuilder.cs
Expand Up @@ -36,7 +36,7 @@ public static ClusterVNodeBuilder AsClusterMember(int clusterSize)
protected override void SetUpProjectionsIfNeeded()
{
_subsystems.Add(new ProjectionsSubsystem(_projectionsThreads, _projectionType,
_startStandardProjections, _projectionsQueryExpiry));
_startStandardProjections, _projectionsQueryExpiry, _faultOutOfOrderProjections));
}
}
}
2 changes: 1 addition & 1 deletion src/EventStore.ClusterNode/Program.cs
Expand Up @@ -202,7 +202,7 @@ private static ClusterVNode BuildNode(ClusterNodeOptions options)
.WithIndexCacheDepth(options.IndexCacheDepth)
.WithIndexMergeOptimization(options.OptimizeIndexMerge)
.WithSslTargetHost(options.SslTargetHost)
.RunProjections(options.RunProjections, options.ProjectionThreads)
.RunProjections(options.RunProjections, options.ProjectionThreads, options.FaultOutOfOrderProjections)
.WithProjectionQueryExpirationOf(TimeSpan.FromMinutes(options.ProjectionsQueryExpiry))
.WithTfCachedChunks(options.CachedChunks)
.WithTfChunksCacheSize(options.ChunksCacheSize)
Expand Down
7 changes: 6 additions & 1 deletion src/EventStore.Core/Cluster/Settings/ClusterVNodeSettings.cs
Expand Up @@ -82,6 +82,8 @@ public class ClusterVNodeSettings

public readonly bool GossipOnSingleNode;

public readonly bool FaultOutOfOrderProjections;

public ClusterVNodeSettings(Guid instanceId, int debugIndex,
IPEndPoint internalTcpEndPoint,
IPEndPoint internalSecureTcpEndPoint,
Expand Down Expand Up @@ -145,7 +147,8 @@ public class ClusterVNodeSettings
bool gossipOnSingleNode = false,
bool skipIndexScanOnReads = false,
bool reduceFileCachePressure = false,
int initializationThreads = 1)
int initializationThreads = 1,
bool faultOutOfOrderProjections = false)
{
Ensure.NotEmptyGuid(instanceId, "instanceId");
Ensure.NotNull(internalTcpEndPoint, "internalTcpEndPoint");
Expand Down Expand Up @@ -241,6 +244,8 @@ public class ClusterVNodeSettings
SkipIndexScanOnReads = skipIndexScanOnReads;
ReduceFileCachePressure = reduceFileCachePressure;
InitializationThreads = initializationThreads;

FaultOutOfOrderProjections = faultOutOfOrderProjections;
}


Expand Down
3 changes: 3 additions & 0 deletions src/EventStore.Core/Util/Opts.cs
Expand Up @@ -130,6 +130,9 @@ public static class Opts
public const string ProjectionThreadsDescr = "The number of threads to use for projections.";
public const int ProjectionThreadsDefault = 3;

public const string FaultOutOfOrderProjectionsDescr = "Fault the projection if the Event number that was expected in the stream differs from what is received. This may happen if events have been deleted or expired";
public const bool FaultOutOfOrderProjectionsDefault = false;

public const string ProjectionsQueryExpiryDescr = "The number of minutes a query can be idle before it expires";
public const int ProjectionsQueryExpiryDefault = 5;

Expand Down
8 changes: 6 additions & 2 deletions src/EventStore.Core/VNodeBuilder.cs
Expand Up @@ -114,6 +114,7 @@ public abstract class VNodeBuilder
protected ProjectionType _projectionType;
protected int _projectionsThreads;
protected TimeSpan _projectionsQueryExpiry;
protected bool _faultOutOfOrderProjections;

protected TFChunkDb _db;
protected ClusterVNodeSettings _vNodeSettings;
Expand Down Expand Up @@ -221,6 +222,7 @@ protected VNodeBuilder()
_skipIndexScanOnReads = Opts.SkipIndexScanOnReadsDefault;
_chunkInitialReaderCount = Opts.ChunkInitialReaderCountDefault;
_projectionsQueryExpiry = TimeSpan.FromMinutes(Opts.ProjectionsQueryExpiryDefault);
_faultOutOfOrderProjections = Opts.FaultOutOfOrderProjectionsDefault;
_reduceFileCachePressure = Opts.ReduceFileCachePressureDefault;
_initializationThreads = Opts.InitializationThreadsDefault;
}
Expand Down Expand Up @@ -268,10 +270,11 @@ public VNodeBuilder DisableHTTPCaching()
/// <param name="projectionType">The mode in which to run the projections system</param>
/// <param name="numberOfThreads">The number of threads to use for projections. Defaults to 3.</param>
/// <returns>A <see cref="VNodeBuilder"/> with the options set</returns>
public VNodeBuilder RunProjections(ProjectionType projectionType, int numberOfThreads = Opts.ProjectionThreadsDefault)
public VNodeBuilder RunProjections(ProjectionType projectionType, int numberOfThreads = Opts.ProjectionThreadsDefault, bool faultOutOfOrderProjections = Opts.FaultOutOfOrderProjectionsDefault)
{
_projectionType = projectionType;
_projectionsThreads = numberOfThreads;
_faultOutOfOrderProjections = faultOutOfOrderProjections;
return this;
}

Expand Down Expand Up @@ -1438,7 +1441,8 @@ public ClusterVNode Build(IOptions options = null, IPersistentSubscriptionConsum
_gossipOnSingleNode,
_skipIndexScanOnReads,
_reduceFileCachePressure,
_initializationThreads);
_initializationThreads,
_faultOutOfOrderProjections);

var infoController = new InfoController(options, _projectionType);

Expand Down
Expand Up @@ -112,7 +112,8 @@ public override void TestFixtureSetUp()
private MiniClusterNode CreateNode(int index, Endpoints endpoints, IPEndPoint[] gossipSeeds)
{
_projections = new ProjectionsSubsystem(1, runProjections: ProjectionType.All,
startStandardProjections: false, projectionQueryExpiry: TimeSpan.FromMinutes(Opts.ProjectionsQueryExpiryDefault));
startStandardProjections: false, projectionQueryExpiry: TimeSpan.FromMinutes(Opts.ProjectionsQueryExpiryDefault),
faultOutOfOrderProjections: Opts.FaultOutOfOrderProjectionsDefault);
var node = new MiniClusterNode(
PathName, index, endpoints.InternalTcp, endpoints.InternalTcpSec, endpoints.InternalHttp, endpoints.ExternalTcp,
endpoints.ExternalTcpSec, endpoints.ExternalHttp, skipInitializeStandardUsersCheck: false,
Expand Down
Expand Up @@ -45,7 +45,8 @@ public void TearDown()
protected void CreateNode()
{
var projections = new ProjectionsSubsystem(1, runProjections: ProjectionType.All,
startStandardProjections: false, projectionQueryExpiry: TimeSpan.FromMinutes(Opts.ProjectionsQueryExpiryDefault));
startStandardProjections: false, projectionQueryExpiry: TimeSpan.FromMinutes(Opts.ProjectionsQueryExpiryDefault),
faultOutOfOrderProjections: Opts.FaultOutOfOrderProjectionsDefault);
Node = new MiniNode(
PathName, inMemDb: true, skipInitializeStandardUsersCheck: false, subsystems: new ISubsystem[] { projections });
Node.Start();
Expand Down
Expand Up @@ -96,7 +96,8 @@ public override void TestFixtureTearDown()
protected MiniNode CreateNode()
{
var projections = new ProjectionsSubsystem(1, runProjections: ProjectionType.All,
startStandardProjections: false, projectionQueryExpiry: TimeSpan.FromMinutes(Opts.ProjectionsQueryExpiryDefault));
startStandardProjections: false, projectionQueryExpiry: TimeSpan.FromMinutes(Opts.ProjectionsQueryExpiryDefault),
faultOutOfOrderProjections: Opts.FaultOutOfOrderProjectionsDefault);
return new MiniNode(
PathName, inMemDb: true, skipInitializeStandardUsersCheck: false, subsystems: new ISubsystem[] { projections });
}
Expand Down
Expand Up @@ -90,7 +90,8 @@ private void CreateNode()
{
var projectionWorkerThreadCount = GivenWorkerThreadCount();
_projections = new ProjectionsSubsystem(projectionWorkerThreadCount, runProjections: ProjectionType.All,
startStandardProjections: false, projectionQueryExpiry: TimeSpan.FromMinutes(Opts.ProjectionsQueryExpiryDefault));
startStandardProjections: false, projectionQueryExpiry: TimeSpan.FromMinutes(Opts.ProjectionsQueryExpiryDefault),
faultOutOfOrderProjections: Opts.FaultOutOfOrderProjectionsDefault);
_node = new MiniNode(
PathName, inMemDb: true, skipInitializeStandardUsersCheck: false, subsystems: new ISubsystem[] { _projections });
_node.Start();
Expand Down
Expand Up @@ -73,7 +73,7 @@ public void Setup()
_bus.Subscribe(_consumer);
ICheckpoint writerCheckpoint = new InMemoryCheckpoint(1000);
var ioDispatcher = new IODispatcher(_bus, new PublishEnvelope(_bus));
_readerService = new EventReaderCoreService(_bus, ioDispatcher, 10, writerCheckpoint, runHeadingReader: true);
_readerService = new EventReaderCoreService(_bus, ioDispatcher, 10, writerCheckpoint, runHeadingReader: true, faultOutOfOrderProjections: true);
_subscriptionDispatcher =
new ReaderSubscriptionDispatcher(_bus);
_spoolProcessingResponseDispatcher = new SpooledStreamReadingDispatcher(_bus);
Expand Down
Expand Up @@ -33,7 +33,7 @@ public void Setup()

ICheckpoint writerCheckpoint = new InMemoryCheckpoint(1000);
_readerService = new EventReaderCoreService(
GetInputQueue(), _ioDispatcher, 10, writerCheckpoint, runHeadingReader: GivenHeadingReaderRunning());
GetInputQueue(), _ioDispatcher, 10, writerCheckpoint, runHeadingReader: GivenHeadingReaderRunning(), faultOutOfOrderProjections: true);
_subscriptionDispatcher =
new ReaderSubscriptionDispatcher(GetInputQueue());

Expand Down
Expand Up @@ -100,11 +100,10 @@ public void should_not_allow_first_event_to_be_greater_than_sequence_number()
{
long eventSequenceNumber = _fromSequenceNumber+5;

Assert.Throws<InvalidOperationException>(() => {
HandleEvents(_streamNames[0],eventSequenceNumber,eventSequenceNumber);
//to trigger event delivery:
HandleEvents(_streamNames[1],100,101);
});
HandleEvents(_streamNames[0],eventSequenceNumber,eventSequenceNumber);
//to trigger event delivery:
HandleEvents(_streamNames[1],100,101);

Assert.AreEqual(1, HandledMessages.OfType<ReaderSubscriptionMessage.Faulted>().Count());
}

Expand All @@ -113,24 +112,22 @@ public void should_not_allow_first_event_to_be_less_than_sequence_number()
{
long eventSequenceNumber = _fromSequenceNumber-1;

Assert.Throws<InvalidOperationException>(() => {
HandleEvents(_streamNames[0],eventSequenceNumber,eventSequenceNumber);
//to trigger event delivery:
HandleEvents(_streamNames[1],100,101);
});
HandleEvents(_streamNames[0],eventSequenceNumber,eventSequenceNumber);
//to trigger event delivery:
HandleEvents(_streamNames[1],100,101);

Assert.AreEqual(1, HandledMessages.OfType<ReaderSubscriptionMessage.Faulted>().Count());
}

[Test]
public void events_after_first_event_should_be_in_sequence()
{
Assert.Throws<InvalidOperationException>(() => {
//_fromSequenceNumber+2 has been omitted
HandleEvents(_streamNames[0],new long[]{_fromSequenceNumber,_fromSequenceNumber+1,_fromSequenceNumber+3,_fromSequenceNumber+4});
//to trigger event delivery:
HandleEvents(_streamNames[1],100,101);
});
Assert.AreEqual(1, HandledMessages.OfType<ReaderSubscriptionMessage.Faulted>().Count());
public void events_after_first_event_should_not_be_in_sequence()
{
//_fromSequenceNumber+2 has been omitted
HandleEvents(_streamNames[0],new long[]{_fromSequenceNumber,_fromSequenceNumber+1,_fromSequenceNumber+3,_fromSequenceNumber+4});
//to trigger event delivery:
HandleEvents(_streamNames[1],100,101);

Assert.AreEqual(2, HandledMessages.OfType<ReaderSubscriptionMessage.Faulted>().Count());
}
}
}
Expand Up @@ -108,15 +108,25 @@ public void allows_first_event_to_be_greater_than_sequence_number()
}

[Test]
public void events_after_first_event_should_be_in_sequence()
{
Assert.Throws<InvalidOperationException>(() => {
//_fromSequenceNumber+2 has been omitted
HandleEvents(_streamNames[0],new long[]{_fromSequenceNumber,_fromSequenceNumber+1,_fromSequenceNumber+3,_fromSequenceNumber+4});
//to trigger event delivery:
HandleEvents(_streamNames[1],100,101);
});
Assert.AreEqual(1, HandledMessages.OfType<ReaderSubscriptionMessage.Faulted>().Count());
public void events_after_first_event_should_not_be_in_sequence()
{
//_fromSequenceNumber+2 has been omitted
HandleEvents(_streamNames[0],new long[]{_fromSequenceNumber,_fromSequenceNumber+1,_fromSequenceNumber+3,_fromSequenceNumber+4});
//to trigger event delivery:
HandleEvents(_streamNames[1],100,101);

Assert.AreEqual(2, HandledMessages.OfType<ReaderSubscriptionMessage.Faulted>().Count());
}

[Test]
public void events_fault_message_for_out_of_sequence_events_should_be()
{
//_fromSequenceNumber+2 has been omitted
HandleEvents(_streamNames[0], new long[] { _fromSequenceNumber, _fromSequenceNumber + 1, _fromSequenceNumber + 3, _fromSequenceNumber + 4 });
//to trigger event delivery:
HandleEvents(_streamNames[1], 100, 101);

Assert.IsTrue(HandledMessages.OfType<ReaderSubscriptionMessage.Faulted>().First().Reason.Contains(" was expected in the stream "));
}
}
}
Expand Up @@ -116,7 +116,6 @@ public void publishes_read_events_from_beginning_with_correct_next_event_number(
public void cannot_handle_repeated_read_events_completed()
{
var correlationId = _consumer.HandledMessages.OfType<ClientMessage.ReadStreamEventsForward>().Last().CorrelationId;
Assert.Throws<InvalidOperationException>(() => {
_edp.Handle(
new ClientMessage.ReadStreamEventsForwardCompleted(
correlationId, "stream", 100, 100, ReadStreamResult.Success,
Expand All @@ -129,7 +128,7 @@ public void cannot_handle_repeated_read_events_completed()
PrepareFlags.SingleWrite | PrepareFlags.TransactionBegin | PrepareFlags.TransactionEnd,
"event_type", new byte[0], new byte[0]))
}, null, false, "", 11, 10, true, 100));
});
Assert.AreEqual(1, HandledMessages.OfType<ReaderSubscriptionMessage.Faulted>().Count());
}

[Test]
Expand Down
Expand Up @@ -89,10 +89,8 @@ public void allows_first_event_to_be_equal_to_sequence_number()
public void should_not_allow_first_event_to_be_greater_than_sequence_number()
{
long eventSequenceNumber = _fromSequenceNumber+5;

Assert.Throws<InvalidOperationException>(() => {
HandleEvents(eventSequenceNumber,eventSequenceNumber);
});

HandleEvents(eventSequenceNumber,eventSequenceNumber);

Assert.AreEqual(1, HandledMessages.OfType<ReaderSubscriptionMessage.Faulted>().Count());
}
Expand All @@ -101,23 +99,19 @@ public void should_not_allow_first_event_to_be_greater_than_sequence_number()
public void should_not_allow_first_event_to_be_less_than_sequence_number()
{
long eventSequenceNumber = _fromSequenceNumber-1;

Assert.Throws<InvalidOperationException>(() => {
HandleEvents(eventSequenceNumber,eventSequenceNumber);
});

HandleEvents(eventSequenceNumber,eventSequenceNumber);

Assert.AreEqual(1, HandledMessages.OfType<ReaderSubscriptionMessage.Faulted>().Count());
}

[Test]
public void events_after_first_event_should_be_in_sequence()
{
Assert.Throws<InvalidOperationException>(() => {
//_fromSequenceNumber+2 has been omitted
HandleEvents(new long[]{_fromSequenceNumber,_fromSequenceNumber+1,_fromSequenceNumber+3,_fromSequenceNumber+4});
});

Assert.AreEqual(1, HandledMessages.OfType<ReaderSubscriptionMessage.Faulted>().Count());
public void events_after_second_event_should_not_be_in_sequence()
{
//_fromSequenceNumber+2 has been omitted
HandleEvents(new long[] { _fromSequenceNumber, _fromSequenceNumber + 1, _fromSequenceNumber + 3, _fromSequenceNumber + 4 });

Assert.AreEqual(2, HandledMessages.OfType<ReaderSubscriptionMessage.Faulted>().Count());
}
}
}
Expand Up @@ -96,14 +96,12 @@ public void allows_first_event_to_be_greater_than_sequence_number()
}

[Test]
public void events_after_first_event_should_be_in_sequence()
{
Assert.Throws<InvalidOperationException>(() => {
//_fromSequenceNumber+2 has been omitted
HandleEvents(new long[]{_fromSequenceNumber,_fromSequenceNumber+1,_fromSequenceNumber+3,_fromSequenceNumber+4});
});
public void events_after_second_event_should_not_be_in_sequence()
{
//_fromSequenceNumber+2 has been omitted
HandleEvents(new long[]{_fromSequenceNumber,_fromSequenceNumber+1,_fromSequenceNumber+3,_fromSequenceNumber+4});

Assert.AreEqual(1, HandledMessages.OfType<ReaderSubscriptionMessage.Faulted>().Count());
Assert.AreEqual(2, HandledMessages.OfType<ReaderSubscriptionMessage.Faulted>().Count());
}
}
}