Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: MassTransit/MassTransit
...
head fork: kellabyte/MassTransit
compare: kellabyteprod
Checking mergeability… Don't worry, you can still create the pull request.
  • 1 commit
  • 16 files changed
  • 0 commit comments
  • 1 contributor
View
281 src/MassTransit.TestFramework/ExtensionsForTestingEndpoints.cs
@@ -12,131 +12,158 @@
// specific language governing permissions and limitations under the License.
namespace MassTransit.TestFramework
{
- using System;
- using Context;
- using Magnum.TestFramework;
- using MassTransit.Transports;
- using NUnit.Framework;
- using Serialization;
-
- public static class ExtensionsForTestingEndpoints
- {
- public static void ShouldContain<TMessage>(this IInboundTransport transport, IMessageSerializer serializer)
- where TMessage : class
- {
- var future = new Future<TMessage>();
-
- transport.Receive(context =>
- {
- context.ShouldNotBeNull();
- context.ShouldBeAnInstanceOf<IReceiveContext>();
-
- serializer.Deserialize(context);
-
- IConsumeContext<TMessage> messageContext;
- if (context.TryGetContext(out messageContext))
- {
- if (!future.IsCompleted)
- future.Complete(messageContext.Message);
- }
-
- return null;
- }, TimeSpan.FromSeconds(3));
-
- future.IsCompleted.ShouldBeTrue(transport.Address + " should contain a message of type " + typeof (TMessage).Name);
- }
-
- public static void ShouldContain<TMessage>(this IEndpoint endpoint)
- where TMessage : class
- {
- var future = new Future<TMessage>();
-
- endpoint.Receive(context =>
- {
- context.ShouldNotBeNull();
- context.ShouldBeAnInstanceOf<IReceiveContext>();
-
- IConsumeContext<TMessage> messageContext;
- if (context.TryGetContext(out messageContext))
- {
- if (!future.IsCompleted)
- future.Complete(messageContext.Message);
- }
-
- return null;
- }, TimeSpan.FromSeconds(3));
-
- future.IsCompleted.ShouldBeTrue(endpoint.Address + " should contain a message of type " + typeof (TMessage).Name);
- }
-
- public static void ShouldContain<TMessage>(this IEndpoint endpoint, TMessage expectedMessage)
- where TMessage : class, CorrelatedBy<Guid>
- {
- endpoint.ShouldContain(expectedMessage, TimeSpan.Zero);
- }
-
- public static void ShouldContain<TMessage>(this IEndpoint endpoint, TMessage expectedMessage, TimeSpan timeout)
- where TMessage : class, CorrelatedBy<Guid>
- {
- var future = new Future<TMessage>();
-
- endpoint.Receive(context =>
- {
- context.ShouldNotBeNull();
- context.ShouldBeAnInstanceOf<IReceiveContext>();
-
- IConsumeContext<TMessage> messageContext;
- if (context.TryGetContext(out messageContext))
- {
- if (messageContext.Message.CorrelationId == expectedMessage.CorrelationId && !future.IsCompleted)
- future.Complete(messageContext.Message);
- }
-
- return null;
- }, timeout);
-
- future.IsCompleted.ShouldBeTrue(endpoint.Address + " should contain a message of type " + typeof (TMessage).Name +
- " with correlation id " + expectedMessage.CorrelationId);
- }
-
- public static void ShouldNotContain<TMessage>(this IEndpoint endpoint)
- where TMessage : class
- {
- endpoint.Receive(context =>
- {
- context.ShouldNotBeNull();
- context.ShouldBeAnInstanceOf<IReceiveContext>();
-
- IConsumeContext<TMessage> messageContext;
- if (context.TryGetContext(out messageContext))
- {
- Assert.Fail(endpoint.Address + " should not contain a message of type " + typeof (TMessage).Name);
- }
-
- return null;
- }, TimeSpan.Zero);
- }
-
- public static void ShouldNotContain<TMessage>(this IEndpoint endpoint, TMessage expectedMessage)
- where TMessage : class, CorrelatedBy<Guid>
- {
- endpoint.Receive(context =>
- {
- context.ShouldNotBeNull();
- context.ShouldBeAnInstanceOf<IReceiveContext>();
-
- IConsumeContext<TMessage> messageContext;
- if (context.TryGetContext(out messageContext))
- {
- if (messageContext.Message.CorrelationId != expectedMessage.CorrelationId)
- return null;
-
- Assert.Fail(endpoint.Address + " should not contain a message of type " + typeof (TMessage).Name +
- " with correlation id " + expectedMessage.CorrelationId);
- }
-
- return null;
- }, TimeSpan.Zero);
- }
- }
+ using System;
+ using Magnum.Extensions;
+ using Magnum.TestFramework;
+ using MassTransit.Transports;
+ using NUnit.Framework;
+ using Serialization;
+
+ public static class ExtensionsForTestingEndpoints
+ {
+ public static void ShouldContain<TMessage>(this IInboundTransport transport, IMessageSerializer serializer)
+ where TMessage : class
+ {
+ var future = new Future<TMessage>();
+
+ transport.Receive(context =>
+ {
+ context.ShouldNotBeNull();
+ context.ShouldBeAnInstanceOf<IReceiveContext>();
+
+ serializer.Deserialize(context);
+
+ IConsumeContext<TMessage> messageContext;
+ if (context.TryGetContext(out messageContext))
+ {
+ if (!future.IsCompleted)
+ future.Complete(messageContext.Message);
+ }
+
+ return null;
+ }, TimeSpan.FromSeconds(8));
+
+ future.IsCompleted.ShouldBeTrue(transport.Address + " should contain a message of type " +
+ typeof(TMessage).Name);
+ }
+
+ public static void ShouldContain(this IInboundTransport transport, IMessageSerializer serializer,
+ Type messageType)
+ {
+ var future = new Future<bool>();
+
+ transport.Receive(context =>
+ {
+ context.ShouldNotBeNull();
+ context.ShouldBeAnInstanceOf<IReceiveContext>();
+
+ serializer.Deserialize(context);
+
+ if (context.IsContextAvailable(messageType))
+ {
+ if (!future.IsCompleted)
+ future.Complete(true);
+ }
+
+ return null;
+ }, TimeSpan.FromSeconds(8));
+
+ future.IsCompleted.ShouldBeTrue(transport.Address + " should contain a message of type " +
+ messageType.ToShortTypeName());
+ }
+
+ public static void ShouldContain<TMessage>(this IEndpoint endpoint)
+ where TMessage : class
+ {
+ var future = new Future<TMessage>();
+
+ endpoint.Receive(context =>
+ {
+ context.ShouldNotBeNull();
+ context.ShouldBeAnInstanceOf<IReceiveContext>();
+
+ IConsumeContext<TMessage> messageContext;
+ if (context.TryGetContext(out messageContext))
+ {
+ if (!future.IsCompleted)
+ future.Complete(messageContext.Message);
+ }
+
+ return null;
+ }, TimeSpan.FromSeconds(8));
+
+ future.IsCompleted.ShouldBeTrue(endpoint.Address + " should contain a message of type " +
+ typeof(TMessage).Name);
+ }
+
+ public static void ShouldContain<TMessage>(this IEndpoint endpoint, TMessage expectedMessage)
+ where TMessage : class, CorrelatedBy<Guid>
+ {
+ endpoint.ShouldContain(expectedMessage, TimeSpan.Zero);
+ }
+
+ public static void ShouldContain<TMessage>(this IEndpoint endpoint, TMessage expectedMessage, TimeSpan timeout)
+ where TMessage : class, CorrelatedBy<Guid>
+ {
+ var future = new Future<TMessage>();
+
+ endpoint.Receive(context =>
+ {
+ context.ShouldNotBeNull();
+ context.ShouldBeAnInstanceOf<IReceiveContext>();
+
+ IConsumeContext<TMessage> messageContext;
+ if (context.TryGetContext(out messageContext))
+ {
+ if (messageContext.Message.CorrelationId == expectedMessage.CorrelationId && !future.IsCompleted)
+ future.Complete(messageContext.Message);
+ }
+
+ return null;
+ }, timeout);
+
+ future.IsCompleted.ShouldBeTrue(endpoint.Address + " should contain a message of type " +
+ typeof(TMessage).Name +
+ " with correlation id " + expectedMessage.CorrelationId);
+ }
+
+ public static void ShouldNotContain<TMessage>(this IEndpoint endpoint)
+ where TMessage : class
+ {
+ endpoint.Receive(context =>
+ {
+ context.ShouldNotBeNull();
+ context.ShouldBeAnInstanceOf<IReceiveContext>();
+
+ if (context.IsContextAvailable(typeof(TMessage)))
+ {
+ Assert.Fail(endpoint.Address + " should not contain a message of type " + typeof(TMessage).Name);
+ }
+
+ return null;
+ }, TimeSpan.Zero);
+ }
+
+ public static void ShouldNotContain<TMessage>(this IEndpoint endpoint, TMessage expectedMessage)
+ where TMessage : class, CorrelatedBy<Guid>
+ {
+ endpoint.Receive(context =>
+ {
+ context.ShouldNotBeNull();
+ context.ShouldBeAnInstanceOf<IReceiveContext>();
+
+ IConsumeContext<TMessage> messageContext;
+ if (context.TryGetContext(out messageContext))
+ {
+ if (messageContext.Message.CorrelationId != expectedMessage.CorrelationId)
+ return null;
+
+ Assert.Fail(endpoint.Address + " should not contain a message of type " + typeof(TMessage).Name +
+ " with correlation id " + expectedMessage.CorrelationId);
+ }
+
+ return null;
+ }, TimeSpan.Zero);
+ }
+ }
}
View
1  src/MassTransit.Tests/MassTransit.Tests.csproj
@@ -192,6 +192,7 @@
<Compile Include="Serialization\DateTime_Specs.cs" />
<Compile Include="Serialization\GenericMessageType_Specs.cs" />
<Compile Include="Serialization\DefaultConstructor_Specs.cs" />
+ <Compile Include="Serialization\SerializationException_Specs.cs" />
<Compile Include="Services\HealthMonitoring\HealthClient_Integration_Specs.cs" />
<Compile Include="Services\Timeout\TimeoutService_Specs.cs" />
<Compile Include="Subscriptions\DynamicAndStaticSubscriptions_Specs.cs" />
View
66 src/MassTransit.Tests/Serialization/SerializationException_Specs.cs
@@ -0,0 +1,66 @@
+// Copyright 2007-2011 Chris Patterson, Dru Sellers, Travis Smith, et. al.
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
+// this file except in compliance with the License. You may obtain a copy of the
+// License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software distributed
+// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+// CONDITIONS OF ANY KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations under the License.
+namespace MassTransit.Tests.Serialization
+{
+ using System;
+ using BusConfigurators;
+ using Magnum.TestFramework;
+ using MassTransit.Transports;
+ using TestFramework;
+ using TextFixtures;
+ using Util;
+
+ [Scenario]
+ public class When_a_message_deserialization_exception_occurs
+ : LoopbackTestFixture
+ {
+ protected override void ConfigureLocalBus(ServiceBusConfigurator configurator)
+ {
+ base.ConfigureLocalBus(configurator);
+
+ configurator.Subscribe(s => { s.Handler<BadMessage>(x => { }); });
+ }
+
+ [Then]
+ public void Should_put_message_in_error_queue()
+ {
+ LocalBus.Endpoint.Send(new BadMessage("Good"));
+
+ IEndpoint errorEndpoint =
+ LocalBus.GetEndpoint(LocalBus.Endpoint.InboundTransport.Address.Uri.AppendToPath("_error"));
+ errorEndpoint.InboundTransport.ShouldContain(errorEndpoint.Serializer, typeof(BadMessage));
+
+ LocalBus.Endpoint.ShouldNotContain<BadMessage>();
+
+ var errorTransport = LocalBus.Endpoint.ErrorTransport as LoopbackTransport;
+ errorTransport.ShouldNotBeNull();
+
+ errorTransport.Count.ShouldEqual(1);
+ }
+
+ class BadMessage
+ {
+ public BadMessage()
+ {
+ throw new InvalidOperationException("I want to be bad.");
+ }
+
+ public BadMessage(string value)
+ {
+ Value = value;
+ }
+
+ public string Value { get; set; }
+ }
+ }
+}
View
2  src/MassTransit.Tests/TextFixtures/EndpointTestFixture.cs
@@ -35,7 +35,7 @@ public void Setup()
try
{
EndpointFactory = _endpointFactoryConfigurator.CreateEndpointFactory();
- _endpointFactoryConfigurator = null;
+ // _endpointFactoryConfigurator = null;
_endpointCache = new EndpointCache(EndpointFactory);
View
12 src/MassTransit/Context/ConsumeContext.cs
@@ -125,6 +125,18 @@ public TMessage Message
get { return _message; }
}
+ public bool IsContextAvailable<T>()
+ where T : class
+ {
+ T messageOfT = Message as T;
+ return messageOfT != null;
+ }
+
+ public bool IsContextAvailable(Type messageType)
+ {
+ return messageType.IsAssignableFrom(Message.GetType());
+ }
+
public bool TryGetContext<T>(out IConsumeContext<T> context)
where T : class
{
View
125 src/MassTransit/Context/IConsumeContext.cs
@@ -12,70 +12,77 @@
// specific language governing permissions and limitations under the License.
namespace MassTransit
{
- using System;
+ using System;
- /// <summary>
- /// Typed consumer context that can be used by message consumers to retrieve out-of-band information
- /// related to a message. This consumer context can also be used for explicitly telling the service bus
- /// to place the message at the end of the input-queue (by calling <see cref="RetryLater"/>) or send the message to the poison-letter queue (by
- /// calling <see cref="GenerateFault"/>.)
- /// </summary>
- /// <typeparam name="T">Incoming message type.</typeparam>
- public interface IConsumeContext<T> :
- IConsumeContext,
- IMessageContext<T>
- where T : class
- {
- /// <summary>
- /// Send the message to the end of the input queue so that it can be processed again later
- /// </summary>
- void RetryLater();
+ /// <summary>
+ /// Typed consumer context that can be used by message consumers to retrieve out-of-band information
+ /// related to a message. This consumer context can also be used for explicitly telling the service bus
+ /// to place the message at the end of the input-queue (by calling <see cref="RetryLater"/>) or send the message to the poison-letter queue (by
+ /// calling <see cref="GenerateFault"/>.)
+ /// </summary>
+ /// <typeparam name="T">Incoming message type.</typeparam>
+ public interface IConsumeContext<T> :
+ IConsumeContext,
+ IMessageContext<T>
+ where T : class
+ {
+ /// <summary>
+ /// Send the message to the end of the input queue so that it can be processed again later
+ /// </summary>
+ void RetryLater();
- /// <summary>
- /// Sends the message to either the fault address if specified or publishes the fault
- /// </summary>
- void GenerateFault(Exception ex);
- }
+ /// <summary>
+ /// Sends the message to either the fault address if specified or publishes the fault
+ /// </summary>
+ void GenerateFault(Exception ex);
+ }
- /// <summary>
- /// The consumer context can be used by message consumers to retrieve out-of-band information
- /// related to a message
- /// </summary>
- public interface IConsumeContext :
- IMessageContext
- {
- /// <summary>
- /// Gets the base context of this consume context.
- /// </summary>
- IReceiveContext BaseContext { get; }
+ /// <summary>
+ /// The consumer context can be used by message consumers to retrieve out-of-band information
+ /// related to a message
+ /// </summary>
+ public interface IConsumeContext :
+ IMessageContext
+ {
+ /// <summary>
+ /// Gets the base context of this consume context.
+ /// </summary>
+ IReceiveContext BaseContext { get; }
- /// <summary>
- /// The bus on which the message was received
- /// </summary>
- IServiceBus Bus { get; }
+ /// <summary>
+ /// The bus on which the message was received
+ /// </summary>
+ IServiceBus Bus { get; }
- /// <summary>
- /// The endpoint from which the message was received
- /// </summary>
- IEndpoint Endpoint { get; }
+ /// <summary>
+ /// The endpoint from which the message was received
+ /// </summary>
+ IEndpoint Endpoint { get; }
- /// <summary>
- /// Retrieves a specified message type from the consumer context, if available.
- /// </summary>
- /// <typeparam name="T">The message type requested</typeparam>
- /// <param name="context">The message context for the requested message type</param>
- /// <returns>True if the message type is available, otherwise false.</returns>
- bool TryGetContext<T>(out IConsumeContext<T> context)
- where T : class;
+ /// <summary>
+ /// Determines if the specified message type is available in the consumer context
+ /// </summary>
+ /// <typeparam name="T"></typeparam>
+ /// <returns></returns>
+ bool IsContextAvailable(Type messageType);
- /// <summary>
- /// Respond to the current message, sending directly to the ResponseAddress if specified
- /// otherwise publishing the message
- /// </summary>
- /// <typeparam name="T">The type of the message to respond with.</typeparam>
- /// <param name="message">The message to send in response</param>
- /// <param name="contextCallback">The context action for specifying additional context information</param>
- void Respond<T>(T message, Action<ISendContext<T>> contextCallback)
- where T : class;
- }
+ /// <summary>
+ /// Retrieves a specified message type from the consumer context, if available.
+ /// </summary>
+ /// <typeparam name="T">The message type requested</typeparam>
+ /// <param name="context">The message context for the requested message type</param>
+ /// <returns>True if the message type is available, otherwise false.</returns>
+ bool TryGetContext<T>(out IConsumeContext<T> context)
+ where T : class;
+
+ /// <summary>
+ /// Respond to the current message, sending directly to the ResponseAddress if specified
+ /// otherwise publishing the message
+ /// </summary>
+ /// <typeparam name="T">The type of the message to respond with.</typeparam>
+ /// <param name="message">The message to send in response</param>
+ /// <param name="contextCallback">The context action for specifying additional context information</param>
+ void Respond<T>(T message, Action<ISendContext<T>> contextCallback)
+ where T : class;
+ }
}
View
5 src/MassTransit/Context/InvalidConsumeContext.cs
@@ -108,6 +108,11 @@ public Uri InputAddress
get { throw CreateException(); }
}
+ public bool IsContextAvailable(Type messageType)
+ {
+ throw CreateException();
+ }
+
public bool TryGetContext<T>(out IConsumeContext<T> context)
where T : class
{
View
56 src/MassTransit/Context/ReceiveContext.cs
@@ -18,7 +18,6 @@ namespace MassTransit.Context
using System.IO;
using System.Runtime.Serialization;
using Logging;
- using Magnum.Extensions;
using Serialization;
using Util;
@@ -26,7 +25,7 @@ public class ReceiveContext :
MessageContext,
IReceiveContext
{
- static readonly ILog _log = Logger.Get(typeof (ReceiveContext));
+ static readonly ILog _log = Logger.Get(typeof(ReceiveContext));
readonly IList<IPublished> _published;
readonly IList<IReceived> _received;
readonly IList<ISent> _sent;
@@ -77,11 +76,12 @@ public void SetEndpoint(IEndpoint endpoint)
public void SetBodyStream(Stream stream)
{
- if (stream == null) throw new ArgumentNullException("stream");
- _bodyStream = stream;
+ if (stream == null)
+ throw new ArgumentNullException("stream");
+ _bodyStream = stream;
}
- public void CopyBodyTo(Stream stream)
+ public void CopyBodyTo(Stream stream)
{
_bodyStream.Seek(0, SeekOrigin.Begin);
_bodyStream.CopyTo(stream);
@@ -122,7 +122,7 @@ public void NotifyConsume<T>(IConsumeContext<T> consumeContext, string consumerT
where T : class
{
if (Endpoint != null)
- Endpoint.Address.LogReceived(consumeContext.MessageId, typeof (T).ToMessageName());
+ Endpoint.Address.LogReceived(consumeContext.MessageId, typeof(T).ToMessageName());
_received.Add(new Received<T>(consumeContext, consumerType, correlationId, _timer.ElapsedMilliseconds));
}
@@ -139,6 +139,11 @@ public IEnumerable<IReceived> Received
public Guid Id { get; private set; }
+ public bool IsContextAvailable(Type messageType)
+ {
+ return _typeConverter.Contains(messageType);
+ }
+
public bool TryGetContext<T>(out IConsumeContext<T> context)
where T : class
{
@@ -150,16 +155,18 @@ public bool TryGetContext<T>(out IConsumeContext<T> context)
context = new ConsumeContext<T>(this, message);
return true;
}
+
+ context = null;
+ return false;
}
catch (Exception ex)
{
var exception = new SerializationException("Failed to deserialize the message", ex);
- _log.Error("Exception converting message to type: " + typeof (T).ToShortTypeName(), exception);
- }
+ throw exception;
- context = null;
- return false;
+// _log.Error("Exception converting message to type: " + typeof (T).ToShortTypeName(), exception);
+ }
}
/// <summary>
@@ -171,7 +178,8 @@ public bool TryGetContext<T>(out IConsumeContext<T> context)
/// <param name="contextCallback">The action to setup the context on the outbound message</param>
public void Respond<T>(T message, [NotNull] Action<ISendContext<T>> contextCallback) where T : class
{
- if (contextCallback == null) throw new ArgumentNullException("contextCallback");
+ if (contextCallback == null)
+ throw new ArgumentNullException("contextCallback");
if (ResponseAddress != null)
{
Bus.GetEndpoint(ResponseAddress).Send(message, context =>
@@ -191,24 +199,24 @@ public bool TryGetContext<T>(out IConsumeContext<T> context)
}
}
- /// <summary>
- /// Create a new <see cref="ReceiveContext"/> from the incoming
- /// stream; the stream should contain the MassTransit <see cref="Envelope"/>
- /// which in turn contains both payload and meta-data/out-of-band data.
- /// </summary>
- /// <param name="bodyStream">Body stream to create receive context from</param>
- /// <returns>The receive context</returns>
- [NotNull]
+ /// <summary>
+ /// Create a new <see cref="ReceiveContext"/> from the incoming
+ /// stream; the stream should contain the MassTransit <see cref="Envelope"/>
+ /// which in turn contains both payload and meta-data/out-of-band data.
+ /// </summary>
+ /// <param name="bodyStream">Body stream to create receive context from</param>
+ /// <returns>The receive context</returns>
+ [NotNull]
public static ReceiveContext FromBodyStream(Stream bodyStream)
{
return new ReceiveContext(bodyStream);
}
- /// <summary>
- /// Create a new empty receive context
- /// </summary>
- /// <returns></returns>
- [NotNull]
+ /// <summary>
+ /// Create a new empty receive context
+ /// </summary>
+ /// <returns></returns>
+ [NotNull]
public static ReceiveContext Empty()
{
return new ReceiveContext(null);
View
1  src/MassTransit/MassTransit.csproj
@@ -381,6 +381,7 @@
<Compile Include="Serialization\JsonMessageSerializer.cs" />
<Compile Include="Serialization\JsonMessageTypeConverter.cs" />
<Compile Include="Serialization\ListJsonConverter.cs" />
+ <Compile Include="Serialization\MessageTypeConverterExtensions.cs" />
<Compile Include="Serialization\StaticMessageTypeConverter.cs" />
<Compile Include="Serialization\StringDecimalConverter.cs" />
<Compile Include="Serialization\VersionOneXmlMessageSerializer.cs" />
View
14 src/MassTransit/Serialization/IMessageTypeConverter.cs
@@ -12,9 +12,13 @@
// specific language governing permissions and limitations under the License.
namespace MassTransit.Serialization
{
- public interface IMessageTypeConverter
- {
- bool TryConvert<T>(out T message)
- where T : class;
- }
+ using System;
+
+ public interface IMessageTypeConverter
+ {
+ bool Contains(Type messageType);
+
+ bool TryConvert<T>(out T message)
+ where T : class;
+ }
}
View
170 src/MassTransit/Serialization/JsonMessageTypeConverter.cs
@@ -12,80 +12,98 @@
// specific language governing permissions and limitations under the License.
namespace MassTransit.Serialization
{
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using Magnum.Reflection;
- using Newtonsoft.Json;
- using Newtonsoft.Json.Linq;
-
- public class JsonMessageTypeConverter :
- IMessageTypeConverter
- {
- readonly JsonSerializer _serializer;
- readonly IEnumerable<string> _supportedTypes;
- readonly JToken _token;
- readonly IDictionary<Type, object> _mapped;
-
- public JsonMessageTypeConverter(JsonSerializer serializer, JToken token, IEnumerable<string> supportedTypes)
- {
- _token = token;
- _supportedTypes = supportedTypes;
- _serializer = serializer;
- _mapped = new Dictionary<Type, object>();
- }
-
- public bool TryConvert<T>(out T message)
- where T : class
- {
- object existing;
- if (_mapped.TryGetValue(typeof(T), out existing))
- {
- message = (T) existing;
- return message != null;
- }
-
- string typeUrn = new MessageUrn(typeof (T)).ToString();
-
- if (_supportedTypes.Any(typeUrn.Equals))
- {
- object obj;
- if (typeof (T).IsInterface && typeof (T).IsAllowedMessageType())
- {
- Type proxyType = InterfaceImplementationBuilder.GetProxyFor(typeof (T));
-
- obj = FastActivator.Create(proxyType);
-
- UsingReader(jsonReader => _serializer.Populate(jsonReader, obj));
- }
- else
- {
- obj = FastActivator<T>.Create();
-
- UsingReader(jsonReader => _serializer.Populate(jsonReader, obj));
- }
-
- _mapped[typeof (T)] = obj;
-
- message = (T) obj;
- return true;
- }
-
- _mapped[typeof (T)] = null;
-
- message = null;
- return false;
- }
-
- void UsingReader(Action<JsonReader> callback)
- {
- if (_token == null)
- return;
-
- using (var jsonReader = new JTokenReader(_token))
- {
- callback(jsonReader);
- }
- }
- }
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using Magnum.Reflection;
+ using Newtonsoft.Json;
+ using Newtonsoft.Json.Linq;
+
+ public class JsonMessageTypeConverter :
+ IMessageTypeConverter
+ {
+ readonly IDictionary<Type, object> _mapped;
+ readonly JsonSerializer _serializer;
+ readonly IEnumerable<string> _supportedTypes;
+ readonly JToken _token;
+
+ public JsonMessageTypeConverter(JsonSerializer serializer, JToken token, IEnumerable<string> supportedTypes)
+ {
+ _token = token;
+ _supportedTypes = supportedTypes;
+ _serializer = serializer;
+ _mapped = new Dictionary<Type, object>();
+ }
+
+ public bool Contains(Type messageType)
+ {
+ object existing;
+ if (_mapped.TryGetValue(messageType, out existing))
+ {
+ return existing != null;
+ }
+
+ string typeUrn = new MessageUrn(messageType).ToString();
+
+ if (_supportedTypes.Any(typeUrn.Equals))
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+ public bool TryConvert<T>(out T message)
+ where T : class
+ {
+ object existing;
+ if (_mapped.TryGetValue(typeof(T), out existing))
+ {
+ message = (T)existing;
+ return message != null;
+ }
+
+ string typeUrn = new MessageUrn(typeof(T)).ToString();
+
+ if (_supportedTypes.Any(typeUrn.Equals))
+ {
+ object obj;
+ if (typeof(T).IsInterface && typeof(T).IsAllowedMessageType())
+ {
+ Type proxyType = InterfaceImplementationBuilder.GetProxyFor(typeof(T));
+
+ obj = FastActivator.Create(proxyType);
+
+ UsingReader(jsonReader => _serializer.Populate(jsonReader, obj));
+ }
+ else
+ {
+ obj = FastActivator<T>.Create();
+
+ UsingReader(jsonReader => _serializer.Populate(jsonReader, obj));
+ }
+
+ _mapped[typeof(T)] = obj;
+
+ message = (T)obj;
+ return true;
+ }
+
+ _mapped[typeof(T)] = null;
+
+ message = null;
+ return false;
+ }
+
+ void UsingReader(Action<JsonReader> callback)
+ {
+ if (_token == null)
+ return;
+
+ using (var jsonReader = new JTokenReader(_token))
+ {
+ callback(jsonReader);
+ }
+ }
+ }
}
View
23 src/MassTransit/Serialization/MessageTypeConverterExtensions.cs
@@ -0,0 +1,23 @@
+// Copyright 2007-2011 Chris Patterson, Dru Sellers, Travis Smith, et. al.
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
+// this file except in compliance with the License. You may obtain a copy of the
+// License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software distributed
+// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+// CONDITIONS OF ANY KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations under the License.
+namespace MassTransit.Serialization
+{
+ public static class MessageTypeConverterExtensions
+ {
+ public static bool Contains<T>(this IMessageTypeConverter converter)
+ where T : class
+ {
+ return converter.Contains(typeof(T));
+ }
+ }
+}
View
51 src/MassTransit/Serialization/StaticMessageTypeConverter.cs
@@ -12,31 +12,36 @@
// specific language governing permissions and limitations under the License.
namespace MassTransit.Serialization
{
- using System;
- using Util;
+ using System;
+ using Util;
- public class StaticMessageTypeConverter :
- IMessageTypeConverter
- {
- readonly object _message;
- readonly Type _messageType;
+ public class StaticMessageTypeConverter :
+ IMessageTypeConverter
+ {
+ readonly object _message;
+ readonly Type _messageType;
- public StaticMessageTypeConverter([NotNull] object message)
- {
- _message = message;
- _messageType = message.GetType();
- }
+ public StaticMessageTypeConverter([NotNull] object message)
+ {
+ _message = message;
+ _messageType = message.GetType();
+ }
- public bool TryConvert<T>(out T message) where T : class
- {
- if (typeof (T).IsAssignableFrom(_messageType))
- {
- message = (T) _message;
- return true;
- }
+ public bool Contains(Type messageType)
+ {
+ return messageType.IsAssignableFrom(_messageType);
+ }
- message = null;
- return false;
- }
- }
+ public bool TryConvert<T>(out T message) where T : class
+ {
+ if (typeof(T).IsAssignableFrom(_messageType))
+ {
+ message = (T)_message;
+ return true;
+ }
+
+ message = null;
+ return false;
+ }
+ }
}
View
11 src/MassTransit/Testing/TestDecorators/ReceiveContextTestDecorator.cs
@@ -141,12 +141,19 @@ public IEndpoint Endpoint
get { return _context.Endpoint; }
}
- public bool TryGetContext<T>(out IConsumeContext<T> context) where T : class
+ public bool IsContextAvailable(Type messageType)
+ {
+ return _context.IsContextAvailable(messageType);
+ }
+
+ public bool TryGetContext<T>(out IConsumeContext<T> context)
+ where T : class
{
return _context.TryGetContext(out context);
}
- public void Respond<T>(T message, Action<ISendContext<T>> contextCallback) where T : class
+ public void Respond<T>(T message, Action<ISendContext<T>> contextCallback)
+ where T : class
{
_context.Respond(message, contextCallback);
}
View
27 src/MassTransit/Transports/Loopback/LoopbackTransport.cs
@@ -39,6 +39,22 @@ public LoopbackTransport(IEndpointAddress address)
Address = address;
}
+ public int Count
+ {
+ get
+ {
+ int messageCount;
+ lock (_messageLock)
+ {
+ GuardAgainstDisposed();
+
+ messageCount = _messages.Count;
+ }
+
+ return messageCount;
+ }
+ }
+
public IEndpointAddress Address { get; private set; }
public IOutboundTransport OutboundTransport
@@ -90,13 +106,7 @@ public void Send(ISendContext context)
public void Receive(Func<IReceiveContext, Action<IReceiveContext>> callback, TimeSpan timeout)
{
- int messageCount;
- lock (_messageLock)
- {
- GuardAgainstDisposed();
-
- messageCount = _messages.Count;
- }
+ int messageCount = Count;
bool waited = false;
@@ -177,7 +187,8 @@ void GuardAgainstDisposed()
void Dispose(bool disposing)
{
- if (_disposed) return;
+ if (_disposed)
+ return;
if (disposing)
{
lock (_messageLock)
View
11 src/MassTransit/Transports/Loopback/LoopbackTransportFactory.cs
@@ -12,13 +12,18 @@
// specific language governing permissions and limitations under the License.
namespace MassTransit.Transports.Loopback
{
+ using System;
+ using Magnum.Caching;
+
public class LoopbackTransportFactory :
ITransportFactory
{
- IMessageNameFormatter _messageNameFormatter;
+ readonly Cache<Uri, LoopbackTransport> _transports;
+ readonly IMessageNameFormatter _messageNameFormatter;
public LoopbackTransportFactory()
{
+ _transports = new ConcurrentCache<Uri, LoopbackTransport>();
_messageNameFormatter = new DefaultMessageNameFormatter("::", "--", ":", "-");
}
@@ -29,7 +34,7 @@ public string Scheme
public IDuplexTransport BuildLoopback(ITransportSettings settings)
{
- return new LoopbackTransport(settings.Address);
+ return _transports.Get(settings.Address.Uri, _ => new LoopbackTransport(settings.Address));
}
public IInboundTransport BuildInbound(ITransportSettings settings)
@@ -44,7 +49,7 @@ public IOutboundTransport BuildOutbound(ITransportSettings settings)
public IOutboundTransport BuildError(ITransportSettings settings)
{
- return new LoopbackTransport(settings.Address);
+ return _transports.Get(settings.Address.Uri, _ => new LoopbackTransport(settings.Address));
}
public IMessageNameFormatter MessageNameFormatter

No commit comments for this range

Something went wrong with that request. Please try again.