diff --git a/NServiceBus.Unicast.Transport.ServiceBroker/ServiceBrokerTransport.cs b/NServiceBus.Unicast.Transport.ServiceBroker/ServiceBrokerTransport.cs index cdadeca..9d9b893 100644 --- a/NServiceBus.Unicast.Transport.ServiceBroker/ServiceBrokerTransport.cs +++ b/NServiceBus.Unicast.Transport.ServiceBroker/ServiceBrokerTransport.cs @@ -2,26 +2,38 @@ using System.Collections.Generic; using System.Data.SqlClient; using System.IO; +using System.Linq; using System.Runtime.Serialization.Formatters.Binary; using System.Threading; +using System.Xml; +using System.Xml.Serialization; using log4net; +using NServiceBus.MessageInterfaces; +using NServiceBus.Serialization; using NServiceBus.Unicast.Transport.Msmq; using NServiceBus.Utils; using ServiceBroker.Net; -namespace NServiceBus.Unicast.Transport.ServiceBroker { - public class ServiceBrokerTransport : ITransport { +namespace NServiceBus.Unicast.Transport.ServiceBroker +{ + public class ServiceBrokerTransport : ITransport + { public const string NServiceBusTransportMessageContract = "NServiceBusTransportMessageContract"; public const string NServiceBusTransportMessage = "NServiceBusTransportMessage"; - public ServiceBrokerTransport() { + public ServiceBrokerTransport() + { NumberOfWorkerThreads = 1; MaxRetries = 5; SecondsToWaitForMessage = 10; } #region members + /// + /// Sets the object which will be used to serialize and deserialize messages. + /// + public IMessageSerializer MessageSerializer { get; set; } private readonly IList workerThreads = new List(); @@ -114,12 +126,15 @@ public class ServiceBrokerTransport : ITransport { /// /// To change the number of worker threads at runtime, call . /// - public virtual int NumberOfWorkerThreads { - get { + public virtual int NumberOfWorkerThreads + { + get + { lock (workerThreads) return workerThreads.Count; } - set { + set + { numberOfWorkerThreads = value; } } @@ -134,8 +149,10 @@ public class ServiceBrokerTransport : ITransport { /// /// Gets the address the service /// - public string Address { - get { + public string Address + { + get + { return ReturnService; } } @@ -145,21 +162,25 @@ public class ServiceBrokerTransport : ITransport { /// stopping or starting worker threads as needed. /// /// - public void ChangeNumberOfWorkerThreads(int targetNumberOfWorkerThreads) { - lock (workerThreads) { + public void ChangeNumberOfWorkerThreads(int targetNumberOfWorkerThreads) + { + lock (workerThreads) + { var current = workerThreads.Count; if (targetNumberOfWorkerThreads == current) return; - if (targetNumberOfWorkerThreads < current) { + if (targetNumberOfWorkerThreads < current) + { for (var i = targetNumberOfWorkerThreads; i < current; i++) workerThreads[i].Stop(); return; } - if (targetNumberOfWorkerThreads > current) { + if (targetNumberOfWorkerThreads > current) + { for (var i = current; i < targetNumberOfWorkerThreads; i++) AddWorkerThread().Start(); @@ -171,8 +192,10 @@ public class ServiceBrokerTransport : ITransport { /// /// Starts the transport. /// - public void Start() { - if (!string.IsNullOrEmpty(InputQueue)) { + public void Start() + { + if (!string.IsNullOrEmpty(InputQueue)) + { for (int i = 0; i < numberOfWorkerThreads; i++) AddWorkerThread().Start(); } @@ -186,7 +209,8 @@ public class ServiceBrokerTransport : ITransport { /// This method will place the message onto the back of the queue /// which may break message ordering. /// - public void ReceiveMessageLater(TransportMessage m) { + public void ReceiveMessageLater(TransportMessage m) + { if (!string.IsNullOrEmpty(ReturnService)) Send(m, ReturnService); } @@ -196,28 +220,120 @@ public class ServiceBrokerTransport : ITransport { /// /// The message to send. /// The address of the destination to send the message to. - public void Send(TransportMessage m, string destination) { - GetSqlTransactionManager().RunInTransaction(transaction => { + public void Send(TransportMessage m, string destination) + { + GetSqlTransactionManager().RunInTransaction(transaction => + { // Always begin and end a conversation to simulate a monologe var conversationHandle = ServiceBrokerWrapper.BeginConversation(transaction, ReturnService, destination, NServiceBusTransportMessageContract); // Use the conversation handle as the message Id m.Id = conversationHandle.ToString(); - var stream = new MemoryStream(1); - // Serialize the transport message - new BinaryFormatter().Serialize(stream, m); + using (var stream = new MemoryStream()) + { + // Serialize the transport message + SerializeTransportMessage(m, stream); - ServiceBrokerWrapper.Send(transaction, conversationHandle, NServiceBusTransportMessage, stream.GetBuffer()); + ServiceBrokerWrapper.Send(transaction, conversationHandle, NServiceBusTransportMessage, stream.GetBuffer()); + } ServiceBrokerWrapper.EndConversation(transaction, conversationHandle); }); } + void SerializeTransportMessage(TransportMessage m, MemoryStream stream) + { + if (UseXmlTransportSeralization) + SerializeToXml(m, stream); + else + new BinaryFormatter().Serialize(stream, m); + } + + + TransportMessage ExtractXmlTransportMessage(Stream bodyStream) + { + var xs = new XmlSerializer(typeof(TransportMessage)); + var transportMessage = (TransportMessage)xs.Deserialize(bodyStream); + + bodyStream.Position = 0; + + + var bodyDoc = new XmlDocument(); + bodyDoc.Load(bodyStream); + + var payLoad = bodyDoc.DocumentElement.SelectSingleNode("Body").FirstChild as XmlCDataSection; + + transportMessage.Body= ExtractMessages(payLoad); + + return transportMessage; + } + + IMessage[] ExtractMessages(XmlCDataSection data) + { + var messages = new XmlDocument(); + messages.LoadXml(data.Data); + using(var stream = new MemoryStream()) + { + messages.Save(stream); + stream.Position = 0; + return MessageSerializer.Deserialize(stream); + } + } + + void SerializeToXml(TransportMessage transportMessage, MemoryStream stream) + { + var overrides = new XmlAttributeOverrides(); + var attrs = new XmlAttributes {XmlIgnore = true}; + + overrides.Add(typeof(TransportMessage), "Messages", attrs); + var xs = new XmlSerializer(typeof(TransportMessage),overrides); + + var doc = new XmlDocument(); + + using (var tempstream = new MemoryStream()) + { + xs.Serialize(tempstream, transportMessage); + tempstream.Position = 0; + + doc.Load(tempstream); + } + + + if (transportMessage.Body != null && transportMessage.BodyStream == null) + { + transportMessage.BodyStream = new MemoryStream(); + MessageSerializer.Serialize(transportMessage.Body, transportMessage.BodyStream); + transportMessage.BodyStream.Position = 0; + } + + + var data = new StreamReader(transportMessage.BodyStream).ReadToEnd(); + var bodyElement = doc.CreateElement("Body"); + bodyElement.AppendChild(doc.CreateCDataSection(data)); + doc.DocumentElement.AppendChild(bodyElement); + + doc.Save(stream); + stream.Position = 0; + + } + + private bool UseXmlTransportSeralization + { + get + { + //if the user has requested xml-seralization we default to serialize the transport message using xml as well + //this produces readable xml in the database and allows for interop scenarios writing to the queues directly + //from within sqlserver + return MessageSerializer is NServiceBus.Serializers.XML.MessageSerializer; + } + } + /// /// Causes the processing of the current message to be aborted. /// - public void AbortHandlingCurrentMessage() { + public void AbortHandlingCurrentMessage() + { _needToAbort = true; } @@ -226,9 +342,11 @@ public class ServiceBrokerTransport : ITransport { /// Returns the number of messages in the queue. /// /// - public int GetNumberOfPendingMessages() { + public int GetNumberOfPendingMessages() + { int count = -1; - GetSqlTransactionManager().RunInTransaction(transaction => { + GetSqlTransactionManager().RunInTransaction(transaction => + { count = ServiceBrokerWrapper.QueryMessageCount(transaction, InputQueue, NServiceBusTransportMessage); }); return count; @@ -237,13 +355,16 @@ public class ServiceBrokerTransport : ITransport { #endregion - private WorkerThread AddWorkerThread() { - lock (workerThreads) { + private WorkerThread AddWorkerThread() + { + lock (workerThreads) + { var result = new WorkerThread(Process); workerThreads.Add(result); - result.Stopped += delegate(object sender, EventArgs e) { + result.Stopped += delegate(object sender, EventArgs e) + { var wt = sender as WorkerThread; lock (workerThreads) workerThreads.Remove(wt); @@ -253,29 +374,40 @@ public class ServiceBrokerTransport : ITransport { } } - private void Process() { + private void Process() + { _needToAbort = false; conversationHandle = string.Empty; - try { - GetSqlTransactionManager().RunInTransaction(transaction => { + try + { + GetSqlTransactionManager().RunInTransaction(transaction => + { ReceiveFromQueue(transaction); }); ClearFailuresForConversation(conversationHandle); - } catch (AbortHandlingCurrentMessageException) { + } + catch (AbortHandlingCurrentMessageException) + { // in case AbortHandlingCurrentMessage was called // don't increment failures, we want this message kept around. - } catch { + } + catch + { IncrementFailuresForConversation(conversationHandle); OnFailedMessageProcessing(); } } - private void ReceiveFromQueue(SqlTransaction transaction) { + private void ReceiveFromQueue(SqlTransaction transaction) + { Message message = null; - try { + try + { message = ServiceBrokerWrapper.WaitAndReceive(transaction, InputQueue, SecondsToWaitForMessage * 1000); - } catch (Exception e) { + } + catch (Exception e) + { Logger.Error("Error in receiving message from queue.", e); throw; // Throw to rollback } @@ -286,11 +418,14 @@ public class ServiceBrokerTransport : ITransport { Guid conversationHandle = message.ConversationHandle; ServiceBrokerTransport.conversationHandle = message.ConversationHandle.ToString(); - try { + try + { // Only handle transport messages - if (message.MessageTypeName == NServiceBusTransportMessage) { + if (message.MessageTypeName == NServiceBusTransportMessage) + { - if (HandledMaxRetries(conversationHandle.ToString())) { + if (HandledMaxRetries(conversationHandle.ToString())) + { Logger.Error(string.Format("Message has failed the maximum number of times allowed, ID={0}.", conversationHandle)); MoveToErrorService(message); return; @@ -301,10 +436,16 @@ public class ServiceBrokerTransport : ITransport { StartedMessageProcessing(this, null); TransportMessage transportMessage = null; - try { + try + { // deserialize - transportMessage = new BinaryFormatter().Deserialize(message.BodyStream) as TransportMessage; - } catch (Exception e) { + if (UseXmlTransportSeralization) + transportMessage = ExtractXmlTransportMessage(message.BodyStream); + else + transportMessage = new BinaryFormatter().Deserialize(message.BodyStream) as TransportMessage; + } + catch (Exception e) + { Logger.Error("Could not extract message data.", e); MoveToErrorService(message); OnFinishedMessageProcessing(); // don't care about failures here @@ -328,17 +469,22 @@ public class ServiceBrokerTransport : ITransport { if (!(exceptionNotThrown && otherExNotThrown)) //cause rollback throw new ApplicationException("Exception occured while processing message."); } - } finally { + } + finally + { // End the conversation ServiceBrokerWrapper.EndConversation(transaction, conversationHandle); } } - private bool HandledMaxRetries(string messageId) { + + private bool HandledMaxRetries(string messageId) + { failuresPerConversationLocker.EnterReadLock(); if (failuresPerConversation.ContainsKey(messageId) && - (failuresPerConversation[messageId] >= MaxRetries)) { + (failuresPerConversation[messageId] >= MaxRetries)) + { failuresPerConversationLocker.ExitReadLock(); failuresPerConversationLocker.EnterWriteLock(); failuresPerConversation.Remove(messageId); @@ -351,34 +497,45 @@ public class ServiceBrokerTransport : ITransport { return false; } - private void ClearFailuresForConversation(string conversationHandle) { + private void ClearFailuresForConversation(string conversationHandle) + { failuresPerConversationLocker.EnterReadLock(); - if (failuresPerConversation.ContainsKey(conversationHandle)) { + if (failuresPerConversation.ContainsKey(conversationHandle)) + { failuresPerConversationLocker.ExitReadLock(); failuresPerConversationLocker.EnterWriteLock(); failuresPerConversation.Remove(conversationHandle); failuresPerConversationLocker.ExitWriteLock(); - } else + } + else failuresPerConversationLocker.ExitReadLock(); } - private void IncrementFailuresForConversation(string conversationHandle) { + private void IncrementFailuresForConversation(string conversationHandle) + { failuresPerConversationLocker.EnterWriteLock(); - try { + try + { if (!failuresPerConversation.ContainsKey(conversationHandle)) failuresPerConversation[conversationHandle] = 1; else failuresPerConversation[conversationHandle] = failuresPerConversation[conversationHandle] + 1; - } finally { + } + finally + { failuresPerConversationLocker.ExitWriteLock(); } } - private bool OnFinishedMessageProcessing() { - try { + private bool OnFinishedMessageProcessing() + { + try + { if (FinishedMessageProcessing != null) FinishedMessageProcessing(this, null); - } catch (Exception e) { + } + catch (Exception e) + { Logger.Error("Failed raising 'finished message processing' event.", e); return false; } @@ -386,11 +543,15 @@ public class ServiceBrokerTransport : ITransport { return true; } - private bool OnTransportMessageReceived(TransportMessage msg) { - try { + private bool OnTransportMessageReceived(TransportMessage msg) + { + try + { if (TransportMessageReceived != null) TransportMessageReceived(this, new TransportMessageReceivedEventArgs(msg)); - } catch (Exception e) { + } + catch (Exception e) + { Logger.Warn("Failed raising 'transport message received' event for message with ID=" + msg.Id, e); return false; } @@ -398,11 +559,15 @@ public class ServiceBrokerTransport : ITransport { return true; } - private bool OnFailedMessageProcessing() { - try { + private bool OnFailedMessageProcessing() + { + try + { if (FailedMessageProcessing != null) FailedMessageProcessing(this, null); - } catch (Exception e) { + } + catch (Exception e) + { Logger.Warn("Failed raising 'failed message processing' event.", e); return false; } @@ -410,15 +575,18 @@ public class ServiceBrokerTransport : ITransport { return true; } - private void MoveToErrorService(Message message) { - GetSqlTransactionManager().RunInTransaction(transaction => { + private void MoveToErrorService(Message message) + { + GetSqlTransactionManager().RunInTransaction(transaction => + { var conversationHandle = ServiceBrokerWrapper.BeginConversation(transaction, ReturnService, ErrorService, NServiceBusTransportMessageContract); ServiceBrokerWrapper.Send(transaction, conversationHandle, NServiceBusTransportMessage, message.Body); ServiceBrokerWrapper.EndConversation(transaction, conversationHandle); }); } - private SqlServiceBrokerTransactionManager GetSqlTransactionManager() { + private SqlServiceBrokerTransactionManager GetSqlTransactionManager() + { if (transactionManager == null) transactionManager = new SqlServiceBrokerTransactionManager(ConnectionString); return transactionManager; @@ -426,7 +594,8 @@ public class ServiceBrokerTransport : ITransport { #region IDisposable Members - public void Dispose() { + public void Dispose() + { lock (workerThreads) for (var i = 0; i < workerThreads.Count; i++) workerThreads[i].Stop(); @@ -436,4 +605,4 @@ public class ServiceBrokerTransport : ITransport { } -} +} \ No newline at end of file