Skip to content

Commit

Permalink
rework how updates are published by making it more resilient to failure
Browse files Browse the repository at this point in the history
my approach is quite decent here to the point where i feel like it would be quite useful to the world and may later on extract it into a new project in a new repo and publish it to nuget

anyways, here was the problem i solved. the update of course would fail to publish if the message bus wasn't available

but before i explain how i solved it, it's worth mentioning how Neil Morrissey in his course "Implementing a Data Management Strategy for an ASP.NET Core Microservices Architecture" solved this problem

what he did to solve this was create a second database to write the update to and have a second project that will poll that database and publish it to the message bus, i don't like this tactic because it's not maintainable and prefer to have the microservice itself ensure that the update is sent to the message bus. but what i did take from this is that you can write the update to the database, it doesn't really need to be a second one

my solution is a bit complex and involves:
- System.Threading.Channels for the first time
- a second dbcontext
- two background services

so System.Threading.Channels. what's its purpose and how to use them

a channel can allow you to offload processing from a request to a hosted service and provide an immediate response to the user

visually, it's pretty simple to understand

write -> channel -> read

in my solution, i offload the publishing of the update to the first background service "PublishUpdateChannelReaderBackgroundService" which is the reader for the channel

the channel is defined as Channel<(BaseMessage, string)> which reads and writes a tuple with the base message and destination to publish to

publishing of the update is first attempted and if it fails, the update is encoded to a base64url and stored in the new dbcontext, which will be migrated to the same database as the one being updated

a second background service is created to read from that dbcontext - the unpublished updates every 10 minutes - and try again. each retry is counted and logged and when an update is finally published, it's removed from the database

all the code is in the shared library and an extension method on the iservicecollection is created to:
- register that dbcontext
- add the repository
- add the channel
- add the two hosted services

in total there's 5 services, and it's reuseable and easy to integrate

on the controllers, after the basemessage is created, instead of publishing it immediately, it's written to the channel

distributed tracing isn't maintained when sending to a channel so i modified the "StartANewActivity" extension method to pass in the operation name and ensure distributed tracing is maintained

skills from Steve Gordan's course "Building ASP.NET Core 3 Hosted Services and .NET Core 3 Worker Services" were used here

this approach to this problem too is my very own one 😌

however, logging has to improved a bit and the publish-destination field has to be synchronized in all the logs
  • Loading branch information
ShaylenReddy42 committed Nov 8, 2022
1 parent b9082b4 commit 74779db
Show file tree
Hide file tree
Showing 19 changed files with 511 additions and 22 deletions.
2 changes: 1 addition & 1 deletion scripts/prebuild.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ SET RabbitMQ__ConnectionProperties__ConnectionString=amqp://localhost:5673
ECHO.
ECHO Create EF Core Bundle for SeelansTyres.Services.TyresService
ECHO.
dotnet tool run dotnet-ef migrations bundle --force --project "%PROJECT%" --startup-project "%PROJECT%" -o efbundle.exe
dotnet tool run dotnet-ef migrations bundle --force --project "%PROJECT%" --startup-project "%PROJECT%" --context TyresDbContext -o efbundle.exe

ECHO.
ECHO Execute EF Core Bundle against configured database connection
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.IdentityModel.Tokens;
using SeelansTyres.Libraries.Shared.Channels;
using SeelansTyres.Libraries.Shared.DbContexts.UnpublishedUpdateDbContext_Entities;
using SeelansTyres.Libraries.Shared.Services;
using System.Text.Json;

namespace SeelansTyres.Libraries.Shared.BackgroundServices;

public class PublishUpdateChannelReaderBackgroundService : BackgroundService
{
private readonly ILogger<PublishUpdateChannelReaderBackgroundService> logger;
private readonly PublishUpdateChannel publishUpdateChannel;
private readonly IMessagingServicePublisher messagingServicePublisher;
private readonly IServiceScopeFactory serviceScopeFactory;

public PublishUpdateChannelReaderBackgroundService(
ILogger<PublishUpdateChannelReaderBackgroundService> logger,
PublishUpdateChannel publishUpdateChannel,
IMessagingServicePublisher messagingServicePublisher,
IServiceScopeFactory serviceScopeFactory)
{
this.logger = logger;
this.publishUpdateChannel = publishUpdateChannel;
this.messagingServicePublisher = messagingServicePublisher;
this.serviceScopeFactory = serviceScopeFactory;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var (message, destination) in publishUpdateChannel.ReadAllFromChannelAsync())
{
message.StartANewActivity("Attempting to publish update");

try
{
logger.LogInformation(
"Background Service => Attempting to publish update to {publishDestination} destination",
destination);

await messagingServicePublisher.PublishMessageAsync(message, destination);

logger.LogInformation(
"{announcement}: Attempt to publish update to {publishDestination} destination completed successfully",
"SUCCEEDED", destination);
}
catch (Exception ex)
{
logger.LogError(
ex,
"{announcement}: Attempt to publish update to {publishDestination} destination was unsuccessful, writing to the database to try again later",
"FAILED", destination);

using var scope = serviceScopeFactory.CreateScope();

var unpublishedUpdateRepository = scope.ServiceProvider.GetService<IUnpublishedUpdateRepository>();

var unpublishedUpdate = new UnpublishedUpdate
{
EncodedUpdate = Base64UrlEncoder.Encode(JsonSerializer.SerializeToUtf8Bytes(message)),
Destination = destination
};

await unpublishedUpdateRepository!.CreateAsync(unpublishedUpdate);

await unpublishedUpdateRepository.SaveChangesAsync();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.IdentityModel.Tokens;
using SeelansTyres.Libraries.Shared.Messages;
using SeelansTyres.Libraries.Shared.Services;
using System.Text.Json;

namespace SeelansTyres.Libraries.Shared.BackgroundServices;

public class RetryUnpublishedUpdatesWorker : BackgroundService
{
private readonly ILogger<RetryUnpublishedUpdatesWorker> logger;
private readonly IServiceScopeFactory serviceScopeFactory;
private readonly IMessagingServicePublisher messagingServicePublisher;

public RetryUnpublishedUpdatesWorker(
ILogger<RetryUnpublishedUpdatesWorker> logger,
IServiceScopeFactory serviceScopeFactory,
IMessagingServicePublisher messagingServicePublisher)
{
this.logger = logger;
this.serviceScopeFactory = serviceScopeFactory;
this.messagingServicePublisher = messagingServicePublisher;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (stoppingToken.IsCancellationRequested is false)
{
using var scope = serviceScopeFactory.CreateScope();

var unpublishedUpdateRepository = scope.ServiceProvider.GetService<IUnpublishedUpdateRepository>();

var unpublishedUpdates = await unpublishedUpdateRepository!.RetrieveAllAsync();

unpublishedUpdates.ForEach(unpublishedUpdate =>
{
unpublishedUpdate.Retries++;
var message = JsonSerializer.Deserialize<BaseMessage>(Base64UrlEncoder.DecodeBytes(unpublishedUpdate.EncodedUpdate));
message!.StartANewActivity("Retrying to publish update");
try
{
messagingServicePublisher.PublishMessageAsync(message!, unpublishedUpdate.Destination);
logger.LogInformation(
"Worker => Unpublished update was published successfully to {destination} after {retries} retries",
unpublishedUpdate.Destination, unpublishedUpdate.Retries);
unpublishedUpdateRepository.DeleteAsync(unpublishedUpdate);
}
catch (Exception) { }
});

await unpublishedUpdateRepository.SaveChangesAsync();

Thread.Sleep(10 * 60_000); // 10 minutes
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using Microsoft.Extensions.Logging;
using SeelansTyres.Libraries.Shared.Messages;
using System.Threading.Channels;

namespace SeelansTyres.Libraries.Shared.Channels;

public class PublishUpdateChannel
{
private readonly ILogger<PublishUpdateChannel> logger;
private readonly Channel<(BaseMessage, string)> channel;

public PublishUpdateChannel(ILogger<PublishUpdateChannel> logger)
{
this.logger = logger;

channel =
Channel.CreateBounded<(BaseMessage, string)>(
new BoundedChannelOptions(capacity: 250)
{
SingleWriter = false,
SingleReader = true
});
}

public async Task<bool> WriteToChannelAsync(BaseMessage message, string destination)
{
while (await channel.Writer.WaitToWriteAsync())
{
if (channel.Writer.TryWrite((message, destination)) is true)
{
logger.LogInformation("Channel => Update has been written to the channel for publishing");

return true;
}
}

return false;
}

public IAsyncEnumerable<(BaseMessage, string)> ReadAllFromChannelAsync() =>
channel.Reader.ReadAllAsync();

public bool TryCompleteWriter(Exception? ex) =>
channel.Writer.TryComplete(ex);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using Microsoft.EntityFrameworkCore;
using SeelansTyres.Libraries.Shared.DbContexts.UnpublishedUpdateDbContext_Entities;

namespace SeelansTyres.Libraries.Shared.DbContexts;

public class UnpublishedUpdateDbContext : DbContext
{
public UnpublishedUpdateDbContext(DbContextOptions<UnpublishedUpdateDbContext> options) : base(options) { }

public DbSet<UnpublishedUpdate> UnpublishedUpdates => Set<UnpublishedUpdate>();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System.ComponentModel.DataAnnotations.Schema;
using System.ComponentModel.DataAnnotations;

namespace SeelansTyres.Libraries.Shared.DbContexts.UnpublishedUpdateDbContext_Entities;

public class UnpublishedUpdate
{
[Key]
[DatabaseGenerated(DatabaseGeneratedOption.Identity)]
public long Id { get; set; }
[Required]
public string EncodedUpdate { get; set; } = string.Empty;
[Required]
public string Destination { get; set; } = string.Empty;
public int Retries { get; set; } = 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ namespace SeelansTyres.Libraries.Shared;

public static class DistributedTracing
{
public static BaseMessage StartANewActivity(this BaseMessage message)
public static BaseMessage StartANewActivity(this BaseMessage message, string operationName = "Processing Message")
{
var activity = new Activity("Processing Message");
var activity = new Activity(operationName);
activity.SetParentId(
traceId: ActivityTraceId.CreateFromString(message.TraceId),
spanId: ActivitySpanId.CreateFromString(message.SpanId));
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using Microsoft.EntityFrameworkCore.Migrations;

#nullable disable

namespace SeelansTyres.Libraries.Shared.Migrations
{
public partial class Initial : Migration
{
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "UnpublishedUpdates",
columns: table => new
{
Id = table.Column<long>(type: "bigint", nullable: false)
.Annotation("SqlServer:Identity", "1, 1"),
EncodedUpdate = table.Column<string>(type: "nvarchar(max)", nullable: false),
Destination = table.Column<string>(type: "nvarchar(max)", nullable: false),
Retries = table.Column<int>(type: "int", nullable: false)
},
constraints: table =>
{
table.PrimaryKey("PK_UnpublishedUpdates", x => x.Id);
});
}

protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "UnpublishedUpdates");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// <auto-generated />
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
using SeelansTyres.Libraries.Shared.DbContexts;

#nullable disable

namespace SeelansTyres.Libraries.Shared.Migrations
{
[DbContext(typeof(UnpublishedUpdateDbContext))]
partial class UnpublishedUpdateDbContextModelSnapshot : ModelSnapshot
{
protected override void BuildModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "6.0.10")
.HasAnnotation("Relational:MaxIdentifierLength", 128);

SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder, 1L, 1);

modelBuilder.Entity("SeelansTyres.Libraries.Shared.DbContexts.UnpublishedUpdateDbContext_Entities.UnpublishedUpdate", b =>
{
b.Property<long>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("bigint");
SqlServerPropertyBuilderExtensions.UseIdentityColumn(b.Property<long>("Id"), 1L, 1);
b.Property<string>("Destination")
.IsRequired()
.HasColumnType("nvarchar(max)");
b.Property<string>("EncodedUpdate")
.IsRequired()
.HasColumnType("nvarchar(max)");
b.Property<int>("Retries")
.HasColumnType("int");
b.HasKey("Id");
b.ToTable("UnpublishedUpdates");
});
#pragma warning restore 612, 618
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using SeelansTyres.Libraries.Shared.DbContexts.UnpublishedUpdateDbContext_Entities;

namespace SeelansTyres.Libraries.Shared.Services;

public interface IUnpublishedUpdateRepository
{
Task CreateAsync(UnpublishedUpdate unpublishedUpdate);
Task<List<UnpublishedUpdate>> RetrieveAllAsync();
Task DeleteAsync(UnpublishedUpdate unpublishedUpdate);

Task<bool> SaveChangesAsync();
}
Loading

0 comments on commit 74779db

Please sign in to comment.