Skip to content

Commit

Permalink
State Machine Activities for Kafka Topic Producer and Event Hub Producer
Browse files Browse the repository at this point in the history
  • Loading branch information
phatboyg committed Dec 24, 2020
1 parent 37e4db3 commit f9ad17a
Show file tree
Hide file tree
Showing 24 changed files with 1,134 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,12 @@ public class AutofacStateMachineActivityFactory :
{
public static readonly IStateMachineActivityFactory Instance = new AutofacStateMachineActivityFactory();

public Activity<TInstance, TData> GetActivity<TActivity, TInstance, TData>(BehaviorContext<TInstance, TData> context)
where TActivity : class, Activity<TInstance, TData>
public T GetService<T>(PipeContext context)
where T : class
{
var lifetimeScope = context.GetPayload<ILifetimeScope>();

return ActivatorUtils.GetOrCreateInstance<TActivity>(lifetimeScope);
}

public Activity<TInstance> GetActivity<TActivity, TInstance>(BehaviorContext<TInstance> context)
where TActivity : class, Activity<TInstance>
{
var lifetimeScope = context.GetPayload<ILifetimeScope>();

return ActivatorUtils.GetOrCreateInstance<TActivity>(lifetimeScope);
return ActivatorUtils.GetOrCreateInstance<T>(lifetimeScope);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using System;
using Automatonymous;
using GreenPipes;
using Metadata;
using GreenPipes.Internals.Extensions;
using Microsoft.Extensions.DependencyInjection;


Expand All @@ -12,25 +12,16 @@ public class DependencyInjectionStateMachineActivityFactory :
{
public static readonly IStateMachineActivityFactory Instance = new DependencyInjectionStateMachineActivityFactory();

Activity<TInstance, TData> IStateMachineActivityFactory.GetActivity<TActivity, TInstance, TData>(BehaviorContext<TInstance, TData> context)
{
return GetActivity<TActivity>(context);
}

Activity<TInstance> IStateMachineActivityFactory.GetActivity<TActivity, TInstance>(BehaviorContext<TInstance> context)
{
return GetActivity<TActivity>(context);
}

static TActivity GetActivity<TActivity>(PipeContext context)
public T GetService<T>(PipeContext context)
where T : class
{
if (context.TryGetPayload(out IServiceScope serviceScope))
return serviceScope.ServiceProvider.GetService<TActivity>() ?? ActivatorUtilities.CreateInstance<TActivity>(serviceScope.ServiceProvider);
return serviceScope.ServiceProvider.GetService<T>() ?? ActivatorUtilities.CreateInstance<T>(serviceScope.ServiceProvider);

if (context.TryGetPayload(out IServiceProvider serviceProvider))
return serviceProvider.GetService<TActivity>() ?? ActivatorUtilities.CreateInstance<TActivity>(serviceProvider);
return serviceProvider.GetService<T>() ?? ActivatorUtilities.CreateInstance<T>(serviceProvider);

throw new PayloadNotFoundException($"IServiceProvider or IServiceScope was not found to create activity: {TypeMetadataCache<TActivity>.ShortName}");
throw new PayloadNotFoundException($"IServiceProvider or IServiceScope was not found to get service: {TypeCache<T>.ShortName}");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,12 @@ public class SimpleInjectorStateMachineActivityFactory :
{
public static readonly IStateMachineActivityFactory Instance = new SimpleInjectorStateMachineActivityFactory();

public Activity<TInstance, TData> GetActivity<TActivity, TInstance, TData>(BehaviorContext<TInstance, TData> context)
where TActivity : class, Activity<TInstance, TData>
public T GetService<T>(PipeContext context)
where T : class
{
var container = context.GetPayload<Scope>();

return (TActivity)container.GetInstance(typeof(TActivity));
}

public Activity<TInstance> GetActivity<TActivity, TInstance>(BehaviorContext<TInstance> context)
where TActivity : class, Activity<TInstance>
{
var container = context.GetPayload<Scope>();

return (TActivity)container.GetInstance(typeof(TActivity));
return container.GetInstance<T>();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,12 @@ public class StructureMapStateMachineActivityFactory :
{
public static readonly IStateMachineActivityFactory Instance = new StructureMapStateMachineActivityFactory();

public Activity<TInstance, TData> GetActivity<TActivity, TInstance, TData>(BehaviorContext<TInstance, TData> context)
where TActivity : class, Activity<TInstance, TData>
public T GetService<T>(PipeContext context)
where T : class
{
var lifetimeScope = context.GetPayload<IContainer>();
var container = context.GetPayload<IContainer>();

return lifetimeScope.GetInstance<TActivity>();
}

public Activity<TInstance> GetActivity<TActivity, TInstance>(BehaviorContext<TInstance> context)
where TActivity : class, Activity<TInstance>
{
var lifetimeScope = context.GetPayload<IContainer>();

return lifetimeScope.GetInstance<TActivity>();
return container.GetInstance<T>();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,12 @@ public class WindsorStateMachineActivityFactory :
{
public static readonly IStateMachineActivityFactory Instance = new WindsorStateMachineActivityFactory();

public Activity<TInstance, TData> GetActivity<TActivity, TInstance, TData>(BehaviorContext<TInstance, TData> context)
where TActivity : class, Activity<TInstance, TData>
public T GetService<T>(PipeContext context)
where T : class
{
var container = context.GetPayload<IKernel>();
var kernel = context.GetPayload<IKernel>();

return container.Resolve<TActivity>();
}

public Activity<TInstance> GetActivity<TActivity, TInstance>(BehaviorContext<TInstance> context)
where TActivity : class, Activity<TInstance>
{
var container = context.GetPayload<IKernel>();

return container.Resolve<TActivity>();
return kernel.Resolve<T>();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Task Activity<TInstance>.Execute(BehaviorContext<TInstance> context, Behavior<TI
{
var factory = context.GetStateMachineActivityFactory();

Activity<TInstance> activity = factory.GetActivity<TActivity, TInstance>(context);
Activity<TInstance> activity = factory.GetService<TActivity>(context);

return activity.Execute(context, next);
}
Expand All @@ -27,7 +27,7 @@ Task Activity<TInstance>.Execute<T>(BehaviorContext<TInstance, T> context, Behav
{
var factory = context.GetStateMachineActivityFactory();

Activity<TInstance> activity = factory.GetActivity<TActivity, TInstance>(context);
Activity<TInstance> activity = factory.GetService<TActivity>(context);

var widenBehavior = new WidenBehavior<TInstance, T>(next, context);

Expand Down Expand Up @@ -69,7 +69,7 @@ void IProbeSite.Probe(ProbeContext context)
{
var factory = context.GetStateMachineActivityFactory();

Activity<TInstance, TData> activity = factory.GetActivity<TActivity, TInstance, TData>(context);
Activity<TInstance, TData> activity = factory.GetService<TActivity>(context);

return activity.Execute(context, next);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
namespace Automatonymous
{
using System;
using GreenPipes;


public class DefaultConstructorStateMachineActivityFactory :
IStateMachineActivityFactory
{
public Activity<TInstance, TData> GetActivity<TActivity, TInstance, TData>(BehaviorContext<TInstance, TData> context)
where TActivity : class, Activity<TInstance, TData>
public T GetService<T>(PipeContext context)
where T : class
{
return (Activity<TInstance, TData>)Activator.CreateInstance(typeof(TActivity));
}

public Activity<TInstance> GetActivity<TActivity, TInstance>(BehaviorContext<TInstance> context)
where TActivity : class, Activity<TInstance>
{
return (Activity<TInstance>)Activator.CreateInstance(typeof(TActivity));
return Activator.CreateInstance<T>();
}
}
}
24 changes: 7 additions & 17 deletions src/MassTransit/Automatonymous/IStateMachineActivityFactory.cs
Original file line number Diff line number Diff line change
@@ -1,26 +1,16 @@
namespace Automatonymous
{
using GreenPipes;


public interface IStateMachineActivityFactory
{
/// <summary>
/// Creates a state machine activity for the specified context
/// </summary>
/// <typeparam name="TActivity"></typeparam>
/// <typeparam name="TInstance"></typeparam>
/// <typeparam name="TData"></typeparam>
/// <param name="context"></param>
/// <returns></returns>
Activity<TInstance, TData> GetActivity<TActivity, TInstance, TData>(BehaviorContext<TInstance, TData> context)
where TActivity : class, Activity<TInstance, TData>;

/// <summary>
/// Creates a state machine activity for the specified context
/// Returns the service, if available, otherwise returns null
/// </summary>
/// <typeparam name="TActivity"></typeparam>
/// <typeparam name="TInstance"></typeparam>
/// <param name="context"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
Activity<TInstance> GetActivity<TActivity, TInstance>(BehaviorContext<TInstance> context)
where TActivity : class, Activity<TInstance>;
T GetService<T>(PipeContext context)
where T : class;
}
}
8 changes: 4 additions & 4 deletions src/MassTransit/Automatonymous/PublishExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public static class PublishExtensions
where TMessage : class
where TException : Exception
{
return source.Add(new PublishActivity<TInstance, TMessage>(x => message, contextCallback));
return source.Add(new FaultedPublishActivity<TInstance, TException, TMessage>(x => message, contextCallback));
}

public static ExceptionActivityBinder<TInstance, TException> PublishAsync<TInstance, TException, TMessage>(
Expand All @@ -94,7 +94,7 @@ public static class PublishExtensions
where TMessage : class
where TException : Exception
{
return source.Add(new PublishActivity<TInstance, TMessage>(x => message, contextCallback));
return source.Add(new FaultedPublishActivity<TInstance, TException, TMessage>(x => message, contextCallback));
}

public static ExceptionActivityBinder<TInstance, TException> Publish<TInstance, TException, TMessage>(
Expand Down Expand Up @@ -127,7 +127,7 @@ public static class PublishExtensions
where TMessage : class
where TException : Exception
{
return source.Add(new PublishActivity<TInstance, TData, TMessage>(x => message, contextCallback));
return source.Add(new FaultedPublishActivity<TInstance, TData, TException, TMessage>(x => message, contextCallback));
}

public static ExceptionActivityBinder<TInstance, TData, TException> PublishAsync<TInstance, TData, TException, TMessage>(
Expand All @@ -138,7 +138,7 @@ public static class PublishExtensions
where TMessage : class
where TException : Exception
{
return source.Add(new PublishActivity<TInstance, TData, TMessage>(x => message, contextCallback));
return source.Add(new FaultedPublishActivity<TInstance, TData, TException, TMessage>(x => message, contextCallback));
}

public static ExceptionActivityBinder<TInstance, TData, TException> Publish<TInstance, TData, TException, TMessage>(
Expand Down
30 changes: 30 additions & 0 deletions src/MassTransit/Exceptions/ProduceException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
namespace MassTransit
{
using System;
using System.Runtime.Serialization;


[Serializable]
public class ProduceException :
MassTransitException
{
public ProduceException()
{
}

public ProduceException(string message)
: base(message)
{
}

public ProduceException(string message, Exception innerException)
: base(message, innerException)
{
}

protected ProduceException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
}
}
}

0 comments on commit f9ad17a

Please sign in to comment.