forked from hibernating-rhinos/rhino-esb
/
MessageHandlingCompletion.cs
131 lines (116 loc) · 3.45 KB
/
MessageHandlingCompletion.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
using System;
using System.Transactions;
using log4net;
using Rhino.ServiceBus.Impl;
namespace Rhino.ServiceBus.Transport
{
public class MessageHandlingCompletion
{
private readonly TransactionScope tx;
private readonly Action sendMessageBackToQueue;
private readonly Action<CurrentMessageInformation, Exception> messageCompleted;
private readonly Action<CurrentMessageInformation> beforeTransactionCommit;
private readonly ILog logger;
private readonly Action<CurrentMessageInformation, Exception> messageProcessingFailure;
private readonly CurrentMessageInformation currentMessageInformation;
private Exception exception;
public MessageHandlingCompletion(TransactionScope tx, Action sendMessageBackToQueue, Exception exception, Action<CurrentMessageInformation, Exception> messageCompleted, Action<CurrentMessageInformation> beforeTransactionCommit, ILog logger, Action<CurrentMessageInformation, Exception> messageProcessingFailure, CurrentMessageInformation currentMessageInformation)
{
this.tx = tx;
this.sendMessageBackToQueue = sendMessageBackToQueue;
this.exception = exception;
this.messageCompleted = messageCompleted;
this.beforeTransactionCommit = beforeTransactionCommit;
this.logger = logger;
this.messageProcessingFailure = messageProcessingFailure;
this.currentMessageInformation = currentMessageInformation;
}
public void HandleMessageCompletion()
{
var txDisposed = false;
try
{
if (SuccessfulCompletion(out txDisposed))
return;
}
finally
{
DisposeTransactionIfNotAlreadyDisposed(txDisposed);
}
//error
NotifyMessageCompleted();
NotifyAboutMessageProcessingFailure();
SendMessageBackToQueue();
}
private void SendMessageBackToQueue()
{
if (sendMessageBackToQueue != null)
sendMessageBackToQueue();
}
private void NotifyMessageCompleted()
{
try
{
if (messageCompleted != null)
messageCompleted(currentMessageInformation, exception);
}
catch (Exception e)
{
logger.Error("An error occured when raising the MessageCompleted event, the error will NOT affect the message processing", e);
}
}
private void NotifyAboutMessageProcessingFailure()
{
try
{
if (messageProcessingFailure != null)
messageProcessingFailure(currentMessageInformation, exception);
}
catch (Exception moduleException)
{
logger.Error("Module failed to process message failure: " + exception.Message,
moduleException);
}
}
private void DisposeTransactionIfNotAlreadyDisposed(bool txDisposed)
{
try
{
if (txDisposed == false && tx != null)
{
logger.Warn("Disposing transaction in error mode");
tx.Dispose();
}
}
catch (Exception e)
{
logger.Warn("Failed to dispose of transaction in error mode.", e);
}
}
private bool SuccessfulCompletion(out bool txDisposed)
{
txDisposed = false;
if (exception != null)
return false;
try
{
if (tx != null)
{
if (beforeTransactionCommit != null)
beforeTransactionCommit(currentMessageInformation);
tx.Complete();
tx.Dispose();
txDisposed = true;
}
NotifyMessageCompleted();
return true;
}
catch (Exception e)
{
logger.Warn("Failed to complete transaction, moving to error mode", e);
exception = e;
}
return false;
}
}
}