A development guide for building loosely coupled, event-driven microservices using Event Driven .NET abstractions and reference architecture.
The following steps illustrate how to create microservices based on the principles of Domain Driven Design (DDD) and Command Query Responsibility Segregation (CQRS) with services that communicate asynchronously over an Event Bus abstraction that uses Dapr for publish-subscribe with an underlying message broker.
- Create a CustomerService Web API project.
- Add the following packages.
- AutoMapper.Extensions.Microsoft.DependencyInjection
- EventDriven.CQRS.Abstractions
- EventDriven.CQRS.Extensions
- EventDriven.DependencyInjection.URF.Mongo
- EventDriven.EventBus.Dapr
- EventDriven.EventBus.EventCache.Mongo
- MongoDB.Driver
- URF.Core.Mongo
- Add the following packages.
- Add Domain/CustomerAggregate folders to the project, then add a
Customer
class that extendsEntity
.- Add properties representing entity state.
public class Customer : Entity { public string FirstName { get; set; } = null!; public string LastName { get; set; } = null!; public Address ShippingAddress { get; set; } = null!; }
- Create commands that are C# records and extend a
Command
base class.
public record CreateCustomer(Customer? Entity) : Command<Customer>(Entity);
public record UpdateCustomer(Customer? Entity) : Command<Customer>(Entity);
public record RemoveCustomer(Guid EntityId) : Command(EntityId);
- Create an Events folder in Domain/CustomerAggregate and add events that extend
DomainEvent
.
public record CustomerCreated(Customer? Entity) : DomainEvent<Customer>(Entity);
public record CustomerUpdated(Customer? Entity) : DomainEvent<Customer>(Entity);
public record CustomerRemoved(Guid EntityId) : DomainEvent(EntityId);
- Update the
Customer
entity to implementICommandProcessor
andIEventApplier
interfaces to process commands by emitting domain events and to apply those events to mutate entity state.- Implementing entity behavior by means of
Process
andApply
methods allows for easier migration to event sourcing in the future. - Implement
ICommandProcessor<CreateCustomer, Customer, CustomerCreated>
to add aProcess
method that accepts aCreateCustomer
command and returns aCustomerCreated
event. - Implement
IEventApplier<CustomerCreated>
to mutate entity state based on aCustomerCreated
event.
- Implementing entity behavior by means of
public class Customer : Entity, ICommandProcessor<CreateCustomer, Customer, CustomerCreated>, IEventApplier<CustomerCreated> { public string FirstName { get; set; } public string LastName { get; set; } public Address ShippingAddress { get; set; } public CustomerCreated Process(CreateCustomer command) // To process command, return one or more domain events => new(command.Entity); public void Apply(CustomerCreated domainEvent) => // Set Id Id = domainEvent.EntityId != default ? domainEvent.EntityId : Guid.NewGuid(); }
- Add a
CreateCustomerHandler
class to a CommandHandlers folder in Domain/CustomerAggregate, and implementICommandHandler<Customer, CreateCustomer>
.- Inject
ICustomerRepository
into the constructor. - In the
Handle
method write code to process the command, apply events, and persist the entity.
public async Task<CommandResult<Customer>> Handle(CreateCustomer command, CancellationToken cancellationToken) { // Process command if (command.Entity == null) return new CommandResult<Customer>(CommandOutcome.InvalidCommand); var domainEvent = command.Entity.Process(command); // Apply events command.Entity.Apply(domainEvent); // Persist entity var entity = await _repository.AddAsync(command.Entity); if (entity == null) return new CommandResult<Customer>(CommandOutcome.InvalidCommand); return new CommandResult<Customer>(CommandOutcome.Accepted, entity); }
- Inject
- Add a Common class library project to the solution.
- Add the following packages:
- EventDriven.CQRS.Abstractions
- EventDriven.EventBus.Abstractions
- Microsoft.Extensions.Logging.Abstractions
- Reference the Common project from the CustomerService project.
- Create a
LoggingBehavior<TRequest, TResponse>
class that implementsIBehavior<TRequest, TResponse>
. - In the
Handle
method perform pre and post handler logging.
public async Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken, RequestHandlerDelegate<TResponse> next) { string requestType = string.Empty; if (typeof(TRequest).IsCommandType()) requestType = "command"; else if (typeof(TRequest).IsQueryType()) requestType = "query"; _logger.LogInformation("----- Handling {RequestType} '{CommandName}'. Request: {@Request}", requestType, request.GetGenericTypeName(), request); var response = await next(); _logger.LogInformation("----- Handled {RequestType} '{CommandName}'. Response: {@Response}", requestType, request.GetGenericTypeName(), response); return response; }
- Create a
CustomerAddressUpdated
record that extendsIntegrationEvent
.
public record CustomerAddressUpdated(Guid CustomerId, Address ShippingAddress) : IntegrationEvent;
- Add the following packages:
- Create a Repositories folder in the CustomerService project to contain repositories.
- Add an
ICustomerRepository
interface.
public interface ICustomerRepository { Task<IEnumerable<Customer>> GetAsync(); Task<Customer?> GetAsync(Guid id); Task<Customer?> AddAsync(Customer entity); Task<Customer?> UpdateAsync(Customer entity); Task<int> RemoveAsync(Guid id); }
- Add a
CustomerRepository
class that implementsICustomerRepository
and extendsDocumentRepository<Customer>
.
public class CustomerRepository : DocumentRepository<Customer>, ICustomerRepository { public CustomerRepository(IMongoCollection<Customer> collection) : base(collection) { } public async Task<IEnumerable<Customer>> GetAsync() => await FindManyAsync(); public async Task<Customer?> GetAsync(Guid id) => await FindOneAsync(e => e.Id == id); public async Task<Customer?> AddAsync(Customer entity) { var existing = await FindOneAsync(e => e.Id == entity.Id); if (existing != null) return null; if (string.IsNullOrWhiteSpace(entity.ETag)) entity.ETag = Guid.NewGuid().ToString(); return await InsertOneAsync(entity); } public async Task<Customer?> UpdateAsync(Customer entity) { var existing = await GetAsync(entity.Id); if (existing == null) return null; if (string.Compare(entity.ETag, existing.ETag, StringComparison.OrdinalIgnoreCase) != 0 ) throw new ConcurrencyException(); entity.ETag = Guid.NewGuid().ToString(); return await FindOneAndReplaceAsync(e => e.Id == entity.Id, entity); } public async Task<int> RemoveAsync(Guid id) => await DeleteOneAsync(e => e.Id == id); }
- Add an
- Add an
UpdateCustomerHandler
class to the CommandHandlers folder.- Inject
ICustomerRepository
,IEventBus
andIMapper
into the constructor. - In the
Handle
method, see if the shipping address has changed, and if so, publish aCustomerAddressUpdated
integration event, so that the order service can update the shipping address in the customer's orders.
public async Task<CommandResult<Customer>> Handle(UpdateCustomer command, CancellationToken cancellationToken) { // Process command if (command.Entity == null) return new CommandResult<Customer>(CommandOutcome.InvalidCommand); var domainEvent = command.Entity.Process(command); // Apply events command.Entity.Apply(domainEvent); // Compare shipping addresses var existing = await _repository.GetAsync(command.EntityId); if (existing == null) return new CommandResult<Customer>(CommandOutcome.NotHandled); var addressChanged = command.Entity.ShippingAddress != existing.ShippingAddress; try { // Persist entity var entity = await _repository.UpdateAsync(command.Entity); if (entity == null) return new CommandResult<Customer>(CommandOutcome.NotFound); // Publish events if (addressChanged) { var shippingAddress = _mapper.Map<Integration.Models.Address>(entity.ShippingAddress); _logger.LogInformation("----- Publishing event: {EventName}", $"v1.{nameof(CustomerAddressUpdated)}"); await _eventBus.PublishAsync( new CustomerAddressUpdated(entity.Id, shippingAddress), null, "v1"); } return new CommandResult<Customer>(CommandOutcome.Accepted, entity); } catch (ConcurrencyException) { return new CommandResult<Customer>(CommandOutcome.Conflict); } }
- Inject
- Create queries and query handlers to retrieve entities from the customer repository.
- Add
GetCustomer
andGetCustomers
records to a Queries folder.
public record GetCustomer(Guid Id) : Query<Customer?>;
public record GetCustomers : Query<IEnumerable<Customer>>;
- Add
GetCustomerHandler
to a QueryHandlers folder.
public class GetCustomerHandler : IQueryHandler<GetCustomer, Customer?> { private readonly ICustomerRepository _repository; public GetCustomerHandler( ICustomerRepository repository) { _repository = repository; } public async Task<Customer?> Handle(GetCustomer query, CancellationToken cancellationToken) { var result = await _repository.GetAsync(query.Id); return result; } }
- Add
GetCustomersHandler
to the QueryHandlers folder.
public class GetCustomersHandler : IQueryHandler<GetCustomers, IEnumerable<Customer>> { private readonly ICustomerRepository _repository; public GetCustomersHandler( ICustomerRepository repository) { _repository = repository; } public async Task<IEnumerable<Customer>> Handle(GetCustomers query, CancellationToken cancellationToken) { var result = await _repository.GetAsync(); return result; } }
- Add
- Add read and write models to a DTO folder. Note that read and write models may differ from one another.
- Add an
AutoMapperProfile
class that extendsProfile
and maps DTO's to entities.
- Add an
- Add a
CustomerCommandController
to the project that injectsICommandBroker
andIMapper
into the ctor.- Add Post, Put and Delete actions which accept a
Customer
DTO, map it to aCustomer
entity and callSendAsync
on the command broker, passing the appropriate command. - Map input and output entities to corresponding DTO's.
// POST api/customer [HttpPost] public async Task<IActionResult> Create([FromBody] DTO.Write.Customer customerDto) { var customerIn = _mapper.Map<Customer>(customerDto); var result = await _commandBroker.SendAsync(new CreateCustomer(customerIn)); if (result.Outcome != CommandOutcome.Accepted) return result.ToActionResult(); var customerOut = _mapper.Map<DTO.Write.Customer>(result.Entity); return new CreatedResult($"api/customer/{customerOut.Id}", customerOut); } // PUT api/customer [HttpPut] public async Task<IActionResult> Update([FromBody] DTO.Write.Customer customerDto) { var customerIn = _mapper.Map<Customer>(customerDto); var result = await _commandBroker.SendAsync(new UpdateCustomer(customerIn)); if (result.Outcome != CommandOutcome.Accepted) return result.ToActionResult(); var customerOut = _mapper.Map<DTO.Write.Customer>(result.Entity); return result.ToActionResult(customerOut); } // DELETE api/customer/id [HttpDelete] [Route("{id}")] public async Task<IActionResult> Remove([FromRoute] Guid id) { var result = await _commandBroker.SendAsync(new RemoveCustomer(id)); return result.Outcome != CommandOutcome.Accepted ? result.ToActionResult() : new NoContentResult(); }
- Add Post, Put and Delete actions which accept a
- Add a
CustomerQueryController
to the project that injectsIQueryBroker
andIMapper
into the constructor.
- Use the repository to retrieve entities, then map those to
Customer
DTO objects.// GET api/customer [HttpGet] public async Task<IActionResult> GetCustomers() { var customers = await _queryBroker.SendAsync(new GetCustomers()); var result = _mapper.Map<IEnumerable<CustomerView>>(customers); return Ok(result); } // GET api/customer/id [HttpGet] [Route("{id:guid}")] public async Task<IActionResult> GetCustomer([FromRoute] Guid id) { var customer = await _queryBroker.SendAsync(new GetCustomer(id)); if (customer == null) return NotFound(); var result = _mapper.Map<CustomerView>(customer); return Ok(result); }
- Register dependencies for CustomerService in
Program
.// Add automapper builder.Services.AddAutoMapper(typeof(Program)); // Add command and query handlers builder.Services.AddHandlers(typeof(Program)); // Add behaviors builder.Services.AddTransient(typeof(IPipelineBehavior<,>), typeof(LoggingBehavior<,>)); // Add database settings builder.Services.AddSingleton<ICustomerRepository, CustomerRepository>(); builder.Services.AddMongoDbSettings<CustomerDatabaseSettings, Customer>(builder.Configuration); // Add Dapr event bus builder.Services.AddDaprEventBus(builder.Configuration, true); builder.Services.AddMongoEventCache(builder.Configuration);
- Add configuration entries to appsettings.json.
"CustomerDatabaseSettings": { "ConnectionString": "mongodb://localhost:27017", "DatabaseName": "CustomersDb", "CollectionName": "Customers" }, "DaprEventBusOptions": { "PubSubName": "pubsub" }, "MongoEventCacheOptions": { "AppName": "order-service" }, "MongoStoreDatabaseSettings": { "ConnectionString": "mongodb://localhost:27017", "DatabaseName": "daprStore", "CollectionName": "daprCollection" }, "DaprEventBusSchemaOptions": { "UseSchemaRegistry": true, "SchemaValidatorType": "Json", "SchemaRegistryType": "Mongo", "AddSchemaOnPublish": true, "MongoStateStoreOptions": { "ConnectionString": "mongodb://localhost:27017", "DatabaseName": "schema-registry", "SchemasCollectionName": "schemas" } }
- Repeat these steps for the Order service.
- Reference the Common project.
- Add Integration/EventHandlers folders with a
CustomerAddressUpdatedEventHandler
class that extendsIntegrationEventHandler<CustomerAddressUpdated>
. - Override
HandleAsync
to update the order addresses for the customer.
public override async Task HandleAsync(CustomerAddressUpdated @event) { var orders = await _orderRepository.GetCustomerOrders(@event.CustomerId); foreach (var order in orders) { var shippingAddress = _mapper.Map<Address>(@event.ShippingAddress); await _orderRepository.UpdateOrderAddress(order.Id, shippingAddress); } }
- Register dependencies for CustomerService in
Program
. - In
Program
registerCustomerAddressUpdatedEventHandler
and add the Dapr Event Bus.builder.Services.AddSingleton<CustomerAddressUpdatedEventHandler>(); builder.Services.AddDaprEventBus(builder.Configuration, true); builder.Services.AddMongoEventCache(builder.Configuration);
- Also in
Program
use Cloud Events, map subscribe handlers, and map Dapr Event Bus endpoints.app.UseCloudEvents(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); endpoints.MapSubscribeHandler(); endpoints.MapDaprEventBus(eventBus => { var customerAddressUpdatedEventHandler = app.Services.GetRequiredService<CustomerAddressUpdatedEventHandler>(); eventBus.Subscribe(customerAddressUpdatedEventHandler, null, "v1"); }); });
- Add configuration entries for
MongoEventCacheOptions
andMongoStoreDatabaseSettings
to appsettings.json."MongoEventCacheOptions": { "AppName": "order-service" }, "MongoStoreDatabaseSettings": { "ConnectionString": "mongodb://localhost:27017", "DatabaseName": "daprStore", "CollectionName": "daprCollection" }
- Lastly, add a dapr/components directory to the reference-architecture folder.
- Add the following dapr component yaml files:
- pubsub.yaml
- statestore.yaml
- statestore-mongodb.yaml
- Files not in use should be placed in a separate folder.
- Add the following dapr component yaml files: