/
MessageLoggingModule.cs
142 lines (127 loc) · 4.9 KB
/
MessageLoggingModule.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
using System;
using System.Messaging;
using Rhino.ServiceBus.Impl;
using Rhino.ServiceBus.Internal;
using Rhino.ServiceBus.Messages;
using Rhino.ServiceBus.Msmq;
namespace Rhino.ServiceBus.MessageModules
{
public class MessageLoggingModule : IMessageModule
{
private readonly IMessageSerializer messageSerializer;
private readonly IEndpointRouter endpointRouter;
private readonly Uri logQueue;
private OpenedQueue queue;
[ThreadStatic] private static DateTime messageArrival;
public Uri LogQueue
{
get { return logQueue; }
}
public MessageLoggingModule(IMessageSerializer messageSerializer, IEndpointRouter endpointRouter, Uri logQueue)
{
this.messageSerializer = messageSerializer;
this.endpointRouter = endpointRouter;
this.logQueue = logQueue;
}
public void Init(ITransport transport, IServiceBus bus)
{
var endpoint = endpointRouter.GetRoutedEndpoint(logQueue);
var queueInfo = MsmqUtil.GetQueuePath(endpoint);
queueInfo.Create();
queue = queueInfo.Open(QueueAccessMode.Send);
transport.MessageArrived += Transport_OnMessageArrived;
transport.MessageProcessingFailure += Transport_OnMessageProcessingFailure;
transport.MessageProcessingCompleted += Transport_OnMessageProcessingCompleted;
transport.MessageSerializationException += Transport_OnMessageSerializationException;
transport.MessageSent+=Transport_OnMessageSent;
}
public void Stop(ITransport transport, IServiceBus bus)
{
transport.MessageArrived -= Transport_OnMessageArrived;
transport.MessageProcessingFailure -= Transport_OnMessageProcessingFailure;
transport.MessageProcessingCompleted -= Transport_OnMessageProcessingCompleted;
transport.MessageSerializationException -= Transport_OnMessageSerializationException;
transport.MessageSent -= Transport_OnMessageSent;
queue.Dispose();
}
private void Transport_OnMessageSent(CurrentMessageInformation info)
{
Send(new MessageSentMessage
{
MessageId = info.MessageId,
Source = info.Source,
Message = info.AllMessages,
MessageType = info.AllMessages[0].ToString(),
Timestamp = DateTime.Now,
Destination = info.Destination
});
}
private void Send(object obj)
{
var msg = new Message
{
Label = obj.ToString(),
Extension = Guid.NewGuid().ToByteArray()
};
messageSerializer.Serialize(new[] { obj }, msg.BodyStream);
queue.Send(msg);
}
private void Transport_OnMessageSerializationException(CurrentMessageInformation info, Exception t)
{
Send(new SerializationErrorMessage
{
MessageId = info.MessageId,
Error = t.ToString(),
Source = info.Source,
});
}
private void Transport_OnMessageProcessingCompleted(CurrentMessageInformation info, Exception ex)
{
var timestamp = DateTime.Now;
Send(new MessageProcessingCompletedMessage
{
Timestamp = timestamp,
Duration = timestamp - messageArrival,
MessageType = info.Message.ToString(),
MessageId = info.MessageId,
Source = info.Source,
});
}
internal void Transport_OnMessageProcessingFailure(CurrentMessageInformation info, Exception e)
{
string messageType = (info.Message ?? "no message").ToString();
SendInSingleTransaction(new MessageProcessingFailedMessage
{
ErrorText = e.ToString(),
Timestamp = DateTime.Now,
MessageType = messageType,
MessageId = info.MessageId,
Source = info.Source,
Message = info.Message
});
}
private void SendInSingleTransaction(object msg)
{
var message = new Message
{
Label = msg.ToString(),
Extension = Guid.NewGuid().ToByteArray()
};
messageSerializer.Serialize(new[]{msg},message.BodyStream);
queue.SendInSingleTransaction(message);
}
private bool Transport_OnMessageArrived(CurrentMessageInformation info)
{
messageArrival = DateTime.Now;
Send(new MessageArrivedMessage
{
Timestamp = messageArrival,
MessageType = info.Message.ToString(),
MessageId = info.MessageId,
Source = info.Source,
Message = info.Message
});
return false;
}
}
}