Skip to content

Learn MassTransit

DUONG Phu-Hiep edited this page Aug 15, 2022 · 7 revisions

Mediator Pattern

Multiple Consumers + Multiple Responses

In this example:

  • the Consumer2 sends the Input1 request, both the Consumer12 and Consumer13 will be triggered, the first response comming out will be returned to the Consumer2

  • The Consumer2 can depend on the IMediator interface to make the request => but it is better to make Consumer2 depend on the more specific IRequestClient<Input1> instead, so that it won't be able to use the IMediator to triggered itself (recursively). In order to make the type IRequestClient<Input1> available to the DI Container, then you will have to register it with cfg.AddRequestClient<Input1>()

//we want to validate the DI graph on startup
builder.Host.UseDefaultServiceProvider(o =>
{
    o.ValidateOnBuild = true;
    o.ValidateScopes = true;
});
builder.Services.AddMediator(cfg =>
{
    cfg.AddConsumers(typeof(Consumer12).Assembly); //add all the consumers in the assembly to the DI graph
    cfg.AddRequestClient<Input1>(); //add `IRequestClient<Input1>` to the DI graph
    cfg.AddRequestClient<Input2>(); //add `IRequestClient<Input2>` to the DI graph
});


public class Consumer13 : IConsumer<Input1>
{
    public async Task Consume(ConsumeContext<Input1> context){...}
}

public class Consumer12 : IConsumer<Input1>
{
    public async Task Consume(ConsumeContext<Input1> context){...}
}

public class Consumer2 : IConsumer<Input2>
{
    public Consumer2(IRequestClient<Input1> requestClient1){...}
    public async Task Consume(ConsumeContext<Input2> context)
    {
        requestClient1.GetResponse(new Input1) // it will trigger both the Consumer12 and the Consumer13, and take the first response
    }
}

Each MassTransit requests has its own scope which is different from the HTTP request scope

In this example:

  • The Http request arrives to the FooController
  • FooController requests the Consumer2 (by sending the Input2)
  • Consumer2 will requests the Consumer1 (by sending the Input1)
  • Both FooController, Consumer1 and Consumer2 depends on Foo
  • Foo is registered as scoped in the DI container.
Program.cs

builder.Services.AddMediator(cfg =>
{
    cfg.AddConsumers<Consumer1>();
    cfg.AddConsumers<Consumer2>();
})
builder.Services.AddScoped<IFoo, Foo>();
..
app.MapControllers();
----
public class FooController : ControllerBase {
    public FooController(IFoo foo) {...}
    requestClient.GetResponse(new Input2); //call the consumer2
}
public class Consumer1 : IConsumer<Input1> {
    public Consumer1(IFoo foo) {...}
}
public class Consumer2 : IConsumer<Input2> {
    public Consumer2(IFoo foo) {...}
    requestClient.GetReponse(new Input1) //call the consumer 1
}

If you think that in the same HttpRequest you got the same instance of Foo then it is WRONG

  • When the HTTP request arrives to the FooController then the FooController got a new instance of Foo scoped to the HTTP request.
  • When the FooController requests Consumer2 then the Consumer2 got other instance of Foo scoped to this MassTransit request (request with Input2 as payload)
  • When the Consumer2 requests Consumer1 then the Consumer1 got other instance of Foo scoped to this MassTransit request (request with Input1 as payload)

In this example, for each Http request, you will got 3 differents instances of Foo. Each instance have a different scope

Now, what if you want to share a common Foo instance between FooController, Consumer1 and Consumer2?

=> You can register the Foo instance to the HttpContext

Program.cs

builder.Services.AddHttpContextAccessor();

----

public class FooController : ControllerBase 
{
    public FooController(IFoo foo) {...}
    [HttpGet]
    public void Get() 
    {
        HttpContext.Items["Foo"] = _foo; //register the instance of Foo which is scoped to the Http request to the HttpContext
    }
}

public class Consumer1 : IConsumer<Input1> {
    public Consumer1(IHttpContextAccessor httpContextAccessor) {...}
    ...
    IFoo? foo = _httpContextAccessor.HttpContext?.Items["Foo"] as IFoo; //get the foo instance from the HttpContext
}
public class Consumer2 : IConsumer<Input2> {
    public Consumer2(IHttpContextAccessor httpContextAccessor) {...}
    ...
    IFoo? foo = _httpContextAccessor.HttpContext?.Items["Foo"] as IFoo; //get the foo instance from the HttpContext
}

What outbox solves?

https://docs.particular.net/nservicebus/outbox/

image

Sample MassTransit outbox

https://www.youtube.com/watch?v=3TjGnmLno_A

https://github.com/MassTransit/Sample-Outbox