Skip to content

Commit

Permalink
Merge branch 'hotfix/Issue2994-CircuitBreaker-Timeout' into hotfix-5.2.9
Browse files Browse the repository at this point in the history
  • Loading branch information
ramonsmits committed Oct 19, 2015
2 parents bd4187e + 0f839f6 commit afb8207
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 2 deletions.
Expand Up @@ -118,13 +118,15 @@
<Compile Include="ScenarioDescriptors\AllTransactionSettings.cs" />
<Compile Include="ScenarioDescriptors\TransactionSettings.cs" />
<Compile Include="NonDTC\When_dispatching_deferred_message_fails_without_dtc.cs" />
<Compile Include="Timeouts\TemporarilyUnavailableTimeoutPersister.cs" />
<Compile Include="Timeouts\OutdatedTimeoutPersister.cs" />
<Compile Include="Timeouts\TransportWithFakeQueues.cs" />
<Compile Include="Timeouts\UpdatedTimeoutPersister.cs" />
<Compile Include="Timeouts\When_dispatched_timeout_already_removed_from_timeout_storage.cs" />
<Compile Include="Timeouts\When_endpoint_uses_outdated_timeout_persistence_without_dtc.cs" />
<Compile Include="Timeouts\When_endpoint_uses_outdated_timeout_persistence_with_disabled_dtc.cs" />
<Compile Include="Timeouts\When_endpoint_uses_outdated_timeout_persistence_with_dtc.cs" />
<Compile Include="Timeouts\When_timeout_storage_is_unavailable_temporarily.cs" />
<Compile Include="Timeouts\When_endpoint_uses_updated_timeout_persistence.cs" />
<Compile Include="Tx\Issue_2481.cs" />
<Compile Include="Volatile\When_sending_to_non_durable_endpoint.cs" />
Expand Down
@@ -0,0 +1,57 @@
namespace NServiceBus.AcceptanceTests.Timeouts
{
using System;
using System.Collections.Generic;
using System.Linq;
using Timeout.Core;

class TemporarilyUnavailableTimeoutPersister : IPersistTimeouts
{
public int SecondsToWait { get; set; }
static bool isAvailable = false;
DateTime NextChangeTime;

public TemporarilyUnavailableTimeoutPersister()
{
NextChangeTime = DateTime.Now.AddSeconds(SecondsToWait);
}

private void ThrowExceptionUntilWait()
{
if (NextChangeTime <= DateTime.Now)
{
NextChangeTime = DateTime.Now.AddSeconds(SecondsToWait);
isAvailable = !isAvailable;
}

if (!isAvailable)
{
throw new Exception("Persister is temporarily unavailable");
}
}

public IEnumerable<Tuple<string, DateTime>> GetNextChunk(DateTime startSlice, out DateTime nextTimeToRunQuery)
{
ThrowExceptionUntilWait();
nextTimeToRunQuery = DateTime.Now.AddSeconds(2);
return Enumerable.Empty<Tuple<string, DateTime>>().ToList();
}

public void Add(TimeoutData timeout)
{
ThrowExceptionUntilWait();
}

public bool TryRemove(string timeoutId, out TimeoutData timeoutData)
{
ThrowExceptionUntilWait();
timeoutData = null;
return true;
}

public void RemoveTimeoutBy(Guid sagaId)
{
ThrowExceptionUntilWait();
}
}
}
@@ -0,0 +1,88 @@
namespace NServiceBus.AcceptanceTests.Timeouts
{
using System;
using AcceptanceTesting;
using EndpointTemplates;
using Features;
using Timeout.Core;
using NUnit.Framework;

class When_timeout_storage_is_unavailable_temporarily : NServiceBusAcceptanceTest
{
[Test]
public void Endpoint_should_start()
{
var context = new TestContext();

Scenario.Define(context)
.WithEndpoint<EndpointWithFlakyTimeoutPersister>()
.Done(c => c.EndpointsStarted)
.Run();

Assert.IsTrue(context.EndpointsStarted);
}


[Test]
public void Endpoint_should_not_shutdown()
{
var context = new TestContext{SecondsToWait = 10};
var stopTime = DateTime.Now.AddSeconds(45);

Scenario.Define(context)
.AllowExceptions(ex => ex.Message.Contains("Persister is temporarily unavailable"))
.WithEndpoint<EndpointWithFlakyTimeoutPersister>(b =>
{
b.CustomConfig(busConfig =>
{
busConfig.DefineCriticalErrorAction((s, ex) =>
{
context.FatalErrorOccurred = true;
});
});
})
.Done(c => context.FatalErrorOccurred || stopTime <= DateTime.Now)
.Run();

Assert.IsFalse(context.FatalErrorOccurred, "Circuit breaker was trigged too soon.");
}

public class TestContext : ScenarioContext
{
public int SecondsToWait { get; set; }
public bool FatalErrorOccurred { get; set; }
}

[Serializable]
public class MyMessage : IMessage { }

public class EndpointWithFlakyTimeoutPersister : EndpointConfigurationBuilder
{
public TestContext TestContext { get; set; }
public EndpointWithFlakyTimeoutPersister()
{
EndpointSetup<DefaultServer>(config =>
{
config.EnableFeature<TimeoutManager>();
config.Transactions().DisableDistributedTransactions();
config.SuppressOutdatedTimeoutPersistenceWarning();
});
}

class Initalizer : Feature
{
public Initalizer()
{
EnableByDefault();
}

protected override void Setup(FeatureConfigurationContext context)
{
context.Container
.ConfigureComponent<TemporarilyUnavailableTimeoutPersister>(DependencyLifecycle.SingleInstance)
.ConfigureProperty(tp => tp.SecondsToWait, 10);
}
}
}
}
}
Expand Up @@ -29,7 +29,9 @@ public void Start()
{
circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker("TimeoutStorageConnectivity", TimeToWaitBeforeTriggeringCriticalError,
ex =>
CriticalError.Raise("Repeated failures when fetching timeouts from storage, endpoint will be terminated.", ex));
{
CriticalError.Raise("Repeated failures when fetching timeouts from storage, endpoint will be terminated.", ex);
});

TimeoutManager.TimeoutPushed = TimeoutsManagerOnTimeoutPushed;

Expand Down
4 changes: 3 additions & 1 deletion src/NServiceBus.Core/Timeout/TimeoutManager.cs
Expand Up @@ -11,9 +11,11 @@
/// </summary>
public class TimeoutManager : Feature
{
const int MinutesToWaitBeforeErrorForTimeoutPersisterReceiver = 2;

internal TimeoutManager()
{
Defaults(s => s.SetDefault("TimeToWaitBeforeTriggeringCriticalErrorForTimeoutPersisterReceiver", TimeSpan.FromSeconds(2)));
Defaults(s => s.SetDefault("TimeToWaitBeforeTriggeringCriticalErrorForTimeoutPersisterReceiver", TimeSpan.FromMinutes(MinutesToWaitBeforeErrorForTimeoutPersisterReceiver)));

DependsOn<TimeoutManagerBasedDeferral>();

Expand Down

0 comments on commit afb8207

Please sign in to comment.