Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SqlTransport Source and Add Azure Service Bus Target #502

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/TimeoutMigrationTool/ASB/AsbConstants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace Particular.TimeoutMigrationTool.ASB
{
public class AsbConstants
{
public const string MigrationQueue = "timeouts-staging";
public const string NServicebusMigrationDestination = "NServiceBus.Migration.Destination";
public const string NServicebusMigrationScheduledTime = "NServiceBus.Migration.ScheduledTime";
}
}
89 changes: 89 additions & 0 deletions src/TimeoutMigrationTool/ASB/AsbEndpointMigrator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
namespace Particular.TimeoutMigrationTool.ASB
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Logging;

public class AsbEndpointMigrator : ITimeoutsTarget.IEndpointTargetBatchMigrator
{
readonly IAzureServiceBusEndpoint _azureServiceBusEndpoint;
string _queueName;
readonly ILogger logger;

public AsbEndpointMigrator(IAzureServiceBusEndpoint azureServiceBusEndpoint, string queueName, ILogger logger)
{
_azureServiceBusEndpoint = azureServiceBusEndpoint;
_queueName = queueName;
this.logger = logger;
this.logger.LogInformation($"Creating Migration for {queueName}");
}

public ValueTask DisposeAsync() => new ValueTask(Task.CompletedTask);

public async ValueTask<int> StageBatch(IReadOnlyList<TimeoutData> timeouts, int batchNumber)
{
logger.LogInformation($"Staging Migration for {_queueName}");
var messageChunks = timeouts.Select(s => MapServiceBusMessage(s)).Chunk(100);
foreach (var messageChunk in messageChunks)
{
await _azureServiceBusEndpoint.SendMessages(AsbConstants.MigrationQueue, messageChunk);
}
return messageChunks.Sum(s => s.Length);
}

public async ValueTask<int> CompleteBatch(int batchNumber)
{
logger.LogInformation($"Completing Migration for {_queueName}");

var counter = 0;
await _azureServiceBusEndpoint.ProcessMessages(AsbConstants.MigrationQueue, async (receivedMessage) =>
{
var scheduledTime = (DateTime)receivedMessage.ApplicationProperties[AsbConstants.NServicebusMigrationScheduledTime];
var messageToSend = new ServiceBusMessage()
{
MessageId = Guid.NewGuid().ToString(),
CorrelationId = receivedMessage.CorrelationId,
ContentType = receivedMessage.ContentType,
Body = receivedMessage.Body
};
foreach (var appProp in receivedMessage.ApplicationProperties)
{
messageToSend.ApplicationProperties.Add(appProp.Key, appProp.Value);
}

if (scheduledTime < DateTime.Now)
{
scheduledTime = DateTime.Now.AddDays(1);
}

counter++;
await _azureServiceBusEndpoint.ScheduleMessage(_queueName, scheduledTime, messageToSend);
});
return counter;
}

ServiceBusMessage MapServiceBusMessage(TimeoutData timeoutData)
{
var serviceBusMessage = new ServiceBusMessage
{
MessageId = Guid.NewGuid().ToString(),
CorrelationId = timeoutData.Id,
ContentType = "application/json",
Body = new BinaryData(timeoutData.State)
};

foreach (var header in timeoutData.Headers)
{
serviceBusMessage.ApplicationProperties.Add(header.Key, header.Value);
}

serviceBusMessage.ApplicationProperties.Add(AsbConstants.NServicebusMigrationDestination, _queueName);
serviceBusMessage.ApplicationProperties.Add(AsbConstants.NServicebusMigrationScheduledTime, timeoutData.Time);

return serviceBusMessage;
}
}
}
57 changes: 57 additions & 0 deletions src/TimeoutMigrationTool/ASB/AsbTarget.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
namespace Particular.TimeoutMigrationTool.ASB
{
using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus.Administration;
using Microsoft.Extensions.Logging;
using Microsoft.Identity.Client;

public class AsbTarget : ITimeoutsTarget
{
readonly IAzureServiceBusEndpoint _azureServiceBusEndpoint;
readonly ILogger logger;

public AsbTarget(IAzureServiceBusEndpoint azureServiceBusEndpoint, ILogger logger)
{
_azureServiceBusEndpoint = azureServiceBusEndpoint;
this.logger = logger;
}

public async ValueTask<MigrationCheckResult> AbleToMigrate(EndpointInfo endpoint)
{
var migrationsResult = new MigrationCheckResult();

try
{
await EnsureQueueExists(AsbConstants.MigrationQueue);
var result = await _azureServiceBusEndpoint.GetQueueAsync(endpoint.EndpointName);
}
catch (Exception)
{
migrationsResult.Problems.Add($"Can not connect to queueName '{endpoint.EndpointName}' on connection ");
}

return migrationsResult;
}

async Task EnsureQueueExists(string queueName)
{
if (!await _azureServiceBusEndpoint.QueueExistsAsync(queueName))
{
await _azureServiceBusEndpoint.CreateQueueAsync(queueName);
}
}

public ValueTask<ITimeoutsTarget.IEndpointTargetBatchMigrator> PrepareTargetEndpointBatchMigrator(string endpointName)
{
return new ValueTask<ITimeoutsTarget.IEndpointTargetBatchMigrator>(new AsbEndpointMigrator(_azureServiceBusEndpoint, endpointName, logger));
}

public ValueTask Abort(string endpointName)
{
return new ValueTask();
}

public ValueTask Complete(string endpointName) => new ValueTask(Task.CompletedTask);
}
}
123 changes: 123 additions & 0 deletions src/TimeoutMigrationTool/ASB/AzureServiceBusEndpoint.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
namespace Particular.TimeoutMigrationTool.ASB
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using Microsoft.Extensions.Logging;

public interface IAzureServiceBusEndpoint
{
Task<QueueProperties> GetQueueAsync(string queueName);

Task<bool> QueueExistsAsync(string queueName);

Task<QueueProperties> CreateQueueAsync(string queueName);

Task SendMessage(string queue, ServiceBusMessage message);

Task SendMessages(string queue, IEnumerable<ServiceBusMessage> messages);

Task<long> ScheduleMessage(string queue, DateTime datetime, ServiceBusMessage message);
Task ProcessMessages(string queue, Func<ServiceBusReceivedMessage, Task> processMessage, int batchCount = 50);

}

public class AzureServiceBusEndpoint : IAzureServiceBusEndpoint
{
readonly string _connectionString;
readonly ILogger _logger;

public AzureServiceBusEndpoint(string connectionString, ILogger logger)
{
_connectionString = connectionString;
_logger = logger;
}

public async Task<QueueProperties> GetQueueAsync(string queueName)
{
var client = new ServiceBusAdministrationClient(_connectionString);
var result = await client.GetQueueAsync(queueName);
return result.Value;
}

public async Task<bool> QueueExistsAsync(string queueName)
{
var client = new ServiceBusAdministrationClient(_connectionString);
var result = await client.QueueExistsAsync(queueName);
return result.Value;
}

public async Task<QueueProperties> CreateQueueAsync(string queueName)
{
_logger.LogInformation($"Creating queue {queueName}");
var client = new ServiceBusAdministrationClient(_connectionString);
var options = new CreateQueueOptions(queueName)
{
LockDuration = TimeSpan.FromSeconds(60)
};
var result = await client.CreateQueueAsync(options);
return result.Value;
}

public async Task SendMessage(string queue, ServiceBusMessage message)
{
var client = GetOrCreateServiceBusClient();
await client.CreateSender(queue).SendMessageAsync(message);
}

public async Task SendMessages(string queue, IEnumerable<ServiceBusMessage> messages)
{
_logger.LogInformation($"{DateTime.UtcNow}: Sending {messages.Count()} to queue {queue}");
var client = GetOrCreateServiceBusClient();
await client.CreateSender(queue).SendMessagesAsync(messages);
}

public async Task<long> ScheduleMessage(string queue, DateTime datetime, ServiceBusMessage message)
{
return await GetOrCreateSender(queue)
.ScheduleMessageAsync(message, datetime);
}

public async Task ProcessMessages(string queue, Func<ServiceBusReceivedMessage, Task> processMessage, int batchCount = 50)
{
var client = GetOrCreateServiceBusClient();
var options = new ServiceBusReceiverOptions() { ReceiveMode = ServiceBusReceiveMode.PeekLock };
var receiver = client.CreateReceiver(queue, options);
bool hasMessages = true;
while (hasMessages)
{
var messages = await receiver.ReceiveMessagesAsync(batchCount, TimeSpan.FromSeconds(30));
hasMessages = messages.Count > 0;
_logger.LogInformation($"{DateTime.UtcNow}: Received {messages.Count} messages from batch, now processing them . . . ");
foreach (var receivedMessage in messages)
{
await processMessage(receivedMessage);
// _logger.LogInformation($"{DateTime.UtcNow}: Processed a message from the batch");
await receiver.CompleteMessageAsync(receivedMessage);
// _logger.LogInformation($"{DateTime.UtcNow}: Completed a message from the batch");
}
}
}

ServiceBusClient _servicebusClient;
ServiceBusClient GetOrCreateServiceBusClient()
{
_servicebusClient ??= new ServiceBusClient(_connectionString);

return _servicebusClient;
}

Dictionary<string, ServiceBusSender> _senders = new Dictionary<string, ServiceBusSender>();
ServiceBusSender GetOrCreateSender(string queue)
{
if (!_senders.ContainsKey(queue))
{
_senders[queue] = GetOrCreateServiceBusClient().CreateSender(queue);
}
return _senders[queue];
}
}
}
2 changes: 2 additions & 0 deletions src/TimeoutMigrationTool/ApplicationOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public class ApplicationOptions
public const string RavenVersion = "ravenVersion";
public const string ForceUseIndex = "forceUseIndex";
public const string RavenTimeoutPrefix = "prefix";

public const string CutoffTime = "cutoffTime";
public const string RabbitMqTargetConnectionString = "target";
public const string MsmqSqlTargetConnectionString = "target";
Expand All @@ -26,5 +27,6 @@ public class ApplicationOptions
public const string NHibernateSourceDialect = "dialect";
public const string AsqTargetConnectionString = "target";
public const string AsqDelayedDeliveryTableName = "delayedtablename";
public const string AsbTargetConnectionString = "target";
}
}
Loading