/
NServiceBusConsumeContext.cs
99 lines (83 loc) · 3.85 KB
/
NServiceBusConsumeContext.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
namespace MassTransit.Interop.NServiceBus.Contexts
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using Context;
using MassTransit.Metadata;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Serialization;
public class NServiceBusConsumeContext :
DeserializerConsumeContext
{
readonly JsonSerializer _deserializer;
readonly JToken _messageToken;
readonly IDictionary<Type, ConsumeContext> _messageTypes;
readonly NServiceBusHeaderAdapter _headerAdapter;
public NServiceBusConsumeContext(JsonSerializer deserializer, ReceiveContext receiveContext, JToken messageToken)
: base(receiveContext)
{
_messageToken = messageToken ?? new JObject();
_deserializer = deserializer;
_messageTypes = new Dictionary<Type, ConsumeContext>();
_headerAdapter = new NServiceBusHeaderAdapter(receiveContext.TransportHeaders);
}
public override Guid? MessageId => _headerAdapter.MessageId;
public override Guid? RequestId => _headerAdapter.RequestId;
public override Guid? CorrelationId => _headerAdapter.CorrelationId;
public override Guid? ConversationId => _headerAdapter.ConversationId;
public override Guid? InitiatorId => _headerAdapter.InitiatorId;
public override DateTime? ExpirationTime => _headerAdapter.ExpirationTime;
public override Uri SourceAddress => _headerAdapter.SourceAddress;
public override Uri DestinationAddress => _headerAdapter.DestinationAddress;
public override Uri ResponseAddress => _headerAdapter.ResponseAddress;
public override Uri FaultAddress => _headerAdapter.FaultAddress;
public override DateTime? SentTime => _headerAdapter.SentTime;
public override Headers Headers => _headerAdapter.Headers;
public override HostInfo Host => _headerAdapter.Host;
public override IEnumerable<string> SupportedMessageTypes => _headerAdapter.SupportedMessageTypes;
public override bool HasMessageType(Type messageType)
{
lock (_messageTypes)
{
if (_messageTypes.TryGetValue(messageType, out var existing))
return existing != null;
}
return false;
}
public override bool TryGetMessage<T>(out ConsumeContext<T> message)
{
lock (_messageTypes)
{
if (_messageTypes.TryGetValue(typeof(T), out var existing))
{
message = existing as ConsumeContext<T>;
return message != null;
}
if (typeof(T) == typeof(JToken))
{
_messageTypes[typeof(T)] = message = new MessageConsumeContext<T>(this, _messageToken as T);
return true;
}
string typeUrn = MessageUrn.ForTypeString<T>();
if (SupportedMessageTypes.Any(x => typeUrn.Equals(x, StringComparison.OrdinalIgnoreCase)))
{
object obj;
Type deserializeType = typeof(T);
if (deserializeType.GetTypeInfo().IsInterface && TypeMetadataCache<T>.IsValidMessageType)
deserializeType = TypeMetadataCache<T>.ImplementationType;
using (JsonReader jsonReader = _messageToken.CreateReader())
{
obj = _deserializer.Deserialize(jsonReader, deserializeType);
}
_messageTypes[typeof(T)] = message = new MessageConsumeContext<T>(this, (T) obj);
return true;
}
_messageTypes[typeof(T)] = message = null;
return false;
}
}
}
}