-
Notifications
You must be signed in to change notification settings - Fork 653
/
Sagas.cs
114 lines (90 loc) · 4.14 KB
/
Sagas.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
namespace NServiceBus.Features;
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus.Sagas;
/// <summary>
/// Used to configure saga.
/// </summary>
public class Sagas : Feature
{
internal Sagas()
{
EnableByDefault();
Defaults(s =>
{
conventions = s.Get<Conventions>();
var sagas = s.GetAvailableTypes().Where(IsSagaType).ToList();
if (sagas.Count > 0)
{
conventions.AddSystemMessagesConventions(t => IsTypeATimeoutHandledByAnySaga(t, sagas));
}
s.EnableFeatureByDefault<SynchronizedStorage>();
});
Defaults(s => s.Set(new SagaMetadataCollection()));
Prerequisite(context => !context.Settings.GetOrDefault<bool>("Endpoint.SendOnly"), "Sagas are only relevant for endpoints receiving messages.");
Prerequisite(config => config.Settings.GetAvailableTypes().Any(IsSagaType), "No sagas were found in the scanned types");
DependsOn<SynchronizedStorage>();
}
/// <summary>
/// See <see cref="Feature.Setup" />.
/// </summary>
protected internal override void Setup(FeatureConfigurationContext context)
{
if (!PersistenceStartup.HasSupportFor<StorageType.Sagas>(context.Settings))
{
throw new Exception("The selected persistence doesn't have support for saga storage. Select another persistence or disable the sagas feature using endpointConfiguration.DisableFeature<Sagas>()");
}
var sagaIdGenerator = context.Settings.GetOrDefault<ISagaIdGenerator>() ?? new DefaultSagaIdGenerator();
var sagaMetaModel = context.Settings.Get<SagaMetadataCollection>();
sagaMetaModel.Initialize(context.Settings.GetAvailableTypes(), conventions);
var verifyIfEntitiesAreShared = !context.Settings.GetOrDefault<bool>(SagaSettings.DisableVerifyingIfEntitiesAreShared);
if (verifyIfEntitiesAreShared)
{
sagaMetaModel.VerifyIfEntitiesAreShared();
}
RegisterCustomFindersInContainer(context.Services, sagaMetaModel);
foreach (var t in context.Settings.GetAvailableTypes())
{
if (IsSagaNotFoundHandler(t))
{
context.Services.AddTransient(typeof(IHandleSagaNotFound), t);
}
}
// Register the Saga related behaviors for incoming messages
context.Pipeline.Register("InvokeSaga", b => new SagaPersistenceBehavior(b.GetRequiredService<ISagaPersister>(), sagaIdGenerator, sagaMetaModel), "Invokes the saga logic");
context.Pipeline.Register("InvokeSagaNotFound", new InvokeSagaNotFoundBehavior(), "Invokes saga not found logic");
context.Pipeline.Register("AttachSagaDetailsToOutGoingMessage", new AttachSagaDetailsToOutGoingMessageBehavior(), "Makes sure that outgoing messages have saga info attached to them");
}
static void RegisterCustomFindersInContainer(IServiceCollection container, IEnumerable<SagaMetadata> sagaMetaModel)
{
foreach (var finder in sagaMetaModel.SelectMany(m => m.Finders))
{
container.AddTransient(finder.Type);
if (finder.Properties.TryGetValue("custom-finder-clr-type", out var customFinderType))
{
container.AddTransient((Type)customFinderType);
}
}
}
static bool IsSagaType(Type t)
{
return IsCompatible(t, typeof(Saga));
}
static bool IsSagaNotFoundHandler(Type t)
{
return IsCompatible(t, typeof(IHandleSagaNotFound));
}
static bool IsCompatible(Type t, Type source)
{
return source.IsAssignableFrom(t) && t != source && !t.IsAbstract && !t.IsInterface && !t.IsGenericType;
}
static bool IsTypeATimeoutHandledByAnySaga(Type type, IEnumerable<Type> sagas)
{
var timeoutHandler = typeof(IHandleTimeouts<>).MakeGenericType(type);
var messageHandler = typeof(IHandleMessages<>).MakeGenericType(type);
return sagas.Any(t => timeoutHandler.IsAssignableFrom(t) && !messageHandler.IsAssignableFrom(t));
}
Conventions conventions;
}