Skip to content

Commit

Permalink
feat: added ability to schema sql server operations (#136)
Browse files Browse the repository at this point in the history
Added ability to schema SQL Server operations
  • Loading branch information
carlosgoias authored Aug 24, 2023
1 parent f8fe911 commit 8f38d9c
Show file tree
Hide file tree
Showing 18 changed files with 130 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ internal class SqlServerRepositorySettings
public string ConnectionString { get; set; }

public string DatabaseName { get; set; }
public string Schema { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

internal class SqlServerRepository : IRepository
{
private const string schema = "dbo";
private const int TimeoutSec = 60;
private readonly ConnectionProvider connectionProvider;

private readonly IRetryQueueItemMessageHeaderRepository retryQueueItemMessageHeaderRepository;
private readonly IRetryQueueItemMessageRepository retryQueueItemMessageRepository;
private readonly IRetryQueueItemRepository retryQueueItemRepository;
Expand All @@ -31,7 +31,7 @@ public SqlServerRepository(
string connectionString,
string dbName)
{
this.sqlServerDbSettings = new SqlServerDbSettings(connectionString, dbName);
this.sqlServerDbSettings = new SqlServerDbSettings(connectionString, dbName, schema);

this.RetryQueueDataProvider = new SqlServerDbDataProviderFactory().Create(this.sqlServerDbSettings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,32 @@ public CreateSchemaCreatorTests(BootstrapperRepositoryFixture bootstrapperReposi
}

[Fact]
public async Task SqlServerDbDataProviderFactory_CreateSchemaCreator_ExecuteSuccessfully()
public async Task PostgresDbDataProviderFactory_CreateSchemaCreator_ExecuteSuccessfully()
{
var sqlDataProviderFactory = new SqlServerDbDataProviderFactory();
var postgresDataProviderFactory = new PostgresDbDataProviderFactory();

var connectionString = this.bootstrapperRepositoryFixture.SqlServerSettings.ConnectionString;
var databaseName = this.bootstrapperRepositoryFixture.SqlServerSettings.DatabaseName;
var connectionString = this.bootstrapperRepositoryFixture.PostgresSettings.ConnectionString;
var databaseName = this.bootstrapperRepositoryFixture.PostgresSettings.DatabaseName;

var sqlSettings = new SqlServerDbSettings(connectionString, databaseName);
var postgresSettings = new PostgresDbSettings(connectionString, databaseName);

var retrySchemaCreator = sqlDataProviderFactory.CreateSchemaCreator(sqlSettings);
var retrySchemaCreator = postgresDataProviderFactory.CreateSchemaCreator(postgresSettings);

await retrySchemaCreator.CreateOrUpdateSchemaAsync(databaseName);
}

[Fact]
public async Task PostgresDbDataProviderFactory_CreateSchemaCreator_ExecuteSuccessfully()
public async Task SqlServerDbDataProviderFactory_CreateSchemaCreator_ExecuteSuccessfully()
{
var postgresDataProviderFactory = new PostgresDbDataProviderFactory();
var sqlDataProviderFactory = new SqlServerDbDataProviderFactory();

var connectionString = this.bootstrapperRepositoryFixture.PostgresSettings.ConnectionString;
var databaseName = this.bootstrapperRepositoryFixture.PostgresSettings.DatabaseName;
var connectionString = this.bootstrapperRepositoryFixture.SqlServerSettings.ConnectionString;
var databaseName = this.bootstrapperRepositoryFixture.SqlServerSettings.DatabaseName;
var schema = this.bootstrapperRepositoryFixture.SqlServerSettings.Schema;

var postgresSettings = new PostgresDbSettings(connectionString, databaseName);
var sqlSettings = new SqlServerDbSettings(connectionString, databaseName, schema);

var retrySchemaCreator = postgresDataProviderFactory.CreateSchemaCreator(postgresSettings);
var retrySchemaCreator = sqlDataProviderFactory.CreateSchemaCreator(sqlSettings);

await retrySchemaCreator.CreateOrUpdateSchemaAsync(databaseName);
}
Expand Down
3 changes: 2 additions & 1 deletion src/KafkaFlow.Retry.IntegrationTests/conf/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
},
"SqlServerRepository": {
"ConnectionString": "Server=localhost; User ID=SA; Password=SqlSever123123; Pooling=true; Trusted_Connection=true; Integrated Security=true; Min Pool Size=1; Max Pool Size=100; MultipleActiveResultSets=true; Application Name=KafkaFlow Retry Tests;",
"DatabaseName": "kafka_flow_retry_durable_test"
"DatabaseName": "kafka_flow_retry_durable_test",
"Schema": "dbo"
},
"PostgresRepository": {
"ConnectionString": "Server=localhost;Database=postgres;User Id=postgres;Password=Postgres123123;Port=5432;Application Name=KafkaFlow Retry Tests;",
Expand Down
2 changes: 2 additions & 0 deletions src/KafkaFlow.Retry.SqlServer/DbConnectionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public DbConnectionContext(SqlServerDbSettings sqlServerDbSettings, bool withinT
this.withinTransaction = withinTransaction;
}

public string Schema => this.sqlServerDbSettings.Schema;

public void Commit()
{
if (this.sqlTransaction is object)
Expand Down
2 changes: 2 additions & 0 deletions src/KafkaFlow.Retry.SqlServer/IDbConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

internal interface IDbConnection : IDisposable
{
string Schema { get; }

SqlCommand CreateCommand();
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace KafkaFlow.Retry.SqlServer.Repositories
namespace KafkaFlow.Retry.SqlServer.Repositories
{
using System.Collections.Generic;
using System.Threading.Tasks;
Expand All @@ -10,4 +10,4 @@ internal interface IRetryQueueItemMessageHeaderRepository

Task<IList<RetryQueueItemMessageHeaderDbo>> GetOrderedAsync(IDbConnection dbConnection, IEnumerable<RetryQueueItemMessageDbo> retryQueueItemMessagesDbo);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace KafkaFlow.Retry.SqlServer.Repositories
namespace KafkaFlow.Retry.SqlServer.Repositories
{
using System.Collections.Generic;
using System.Threading.Tasks;
Expand All @@ -10,4 +10,4 @@ internal interface IRetryQueueItemMessageRepository

Task<IList<RetryQueueItemMessageDbo>> GetMessagesOrderedAsync(IDbConnection dbConnection, IEnumerable<RetryQueueItemDbo> retryQueueItemsDbo);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace KafkaFlow.Retry.SqlServer.Repositories
namespace KafkaFlow.Retry.SqlServer.Repositories
{
using System;
using System.Collections.Generic;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace KafkaFlow.Retry.SqlServer.Repositories
namespace KafkaFlow.Retry.SqlServer.Repositories
{
using System;
using System.Collections.Generic;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace KafkaFlow.Retry.SqlServer.Repositories
namespace KafkaFlow.Retry.SqlServer.Repositories
{
using System.Collections.Generic;
using System.Data.SqlClient;
Expand Down Expand Up @@ -29,8 +29,8 @@ public async Task<IList<RetryQueueItemMessageHeaderDbo>> GetOrderedAsync(IDbConn
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = $@"SELECT *
FROM [RetryItemMessageHeaders] h
INNER JOIN [RetryQueueItems] rqi ON rqi.Id = h.IdItemMessage
FROM [{dbConnection.Schema}].[RetryItemMessageHeaders] h
INNER JOIN [{dbConnection.Schema}].[RetryQueueItems] rqi ON rqi.Id = h.IdItemMessage
WHERE h.IdItemMessage IN ({string.Join(",", retryQueueItemMessagesDbo.Select(x => $"'{x.IdRetryQueueItem}'"))})
ORDER BY rqi.IdRetryQueue, h.IdItemMessage";

Expand All @@ -46,7 +46,7 @@ private async Task AddAsync(IDbConnection dbConnection, RetryQueueItemMessageHea
using (var command = dbConnection.CreateCommand())
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = @"INSERT INTO [RetryItemMessageHeaders]
command.CommandText = $@"INSERT INTO [{dbConnection.Schema}].[RetryItemMessageHeaders]
(IdItemMessage, [Key], Value)
VALUES
(@IdItemMessage, @Key, @Value)";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace KafkaFlow.Retry.SqlServer.Repositories
namespace KafkaFlow.Retry.SqlServer.Repositories
{
using System.Collections.Generic;
using System.Data.SqlClient;
Expand All @@ -16,7 +16,7 @@ public async Task AddAsync(IDbConnection dbConnection, RetryQueueItemMessageDbo
using (var command = dbConnection.CreateCommand())
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = @"INSERT INTO [ItemMessages]
command.CommandText = $@"INSERT INTO [{dbConnection.Schema}].[ItemMessages]
(IdRetryQueueItem, [Key], Value, TopicName, Partition, Offset, UtcTimeStamp)
VALUES
(@idRetryQueueItem, @key, @value, @topicName, @partition, @offSet, @utcTimeStamp)";
Expand Down Expand Up @@ -50,11 +50,11 @@ public async Task<IList<RetryQueueItemMessageDbo>> GetMessagesOrderedAsync(IDbCo
}
var parameter = new SqlParameter("@RetryQueueItemsIds", entriesToLoad);
parameter.Direction = System.Data.ParameterDirection.Input;
parameter.TypeName = "dbo.TY_RetryQueueItemsIds";
parameter.TypeName = $"{dbConnection.Schema}.TY_RetryQueueItemsIds";

command.Parameters.Add(parameter);
command.CommandType = System.Data.CommandType.Text;
command.CommandText = $@"EXEC P_LoadItemMessages @RetryQueueItemsIds";
command.CommandText = $@"EXEC {dbConnection.Schema}.P_LoadItemMessages @RetryQueueItemsIds";

return await this.ExecuteReaderAsync(command).ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace KafkaFlow.Retry.SqlServer.Repositories
namespace KafkaFlow.Retry.SqlServer.Repositories
{
using System;
using System.Collections.Generic;
Expand All @@ -21,11 +21,11 @@ public async Task<long> AddAsync(IDbConnection dbConnection, RetryQueueItemDbo r
using (var command = dbConnection.CreateCommand())
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = @"INSERT INTO [RetryQueueItems]
command.CommandText = $@"INSERT INTO [{dbConnection.Schema}].[RetryQueueItems]
(IdDomain, IdRetryQueue, IdDomainRetryQueue, IdItemStatus, IdSeverityLevel, AttemptsCount, Sort, CreationDate, LastExecution, ModifiedStatusDate, Description)
VALUES
(@idDomain, @idRetryQueue, @idDomainRetryQueue, @idItemStatus, @idSeverityLevel, @attemptsCount,
(SELECT COUNT(1) FROM [RetryQueueItems] WHERE IdDomainRetryQueue = @idDomainRetryQueue),
(SELECT COUNT(1) FROM [{dbConnection.Schema}].[RetryQueueItems] WHERE IdDomainRetryQueue = @idDomainRetryQueue),
@creationDate, @lastExecution, @modifiedStatusDate, @description);
SELECT SCOPE_IDENTITY()";
Expand Down Expand Up @@ -53,8 +53,8 @@ public async Task<bool> AnyItemStillActiveAsync(IDbConnection dbConnection, Guid
using (var command = dbConnection.CreateCommand())
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = @"SELECT 1 WHERE EXISTS(
SELECT TOP 1 * FROM [RetryQueueItems]
command.CommandText = $@"SELECT 1 WHERE EXISTS(
SELECT TOP 1 * FROM [{dbConnection.Schema}].[RetryQueueItems]
WITH (NOLOCK)
WHERE IdDomainRetryQueue = @IdDomainRetryQueue
AND IdItemStatus IN (@IdItemStatusWaiting, @IdItemStatusInRetry))";
Expand All @@ -77,8 +77,8 @@ public async Task<RetryQueueItemDbo> GetItemAsync(IDbConnection dbConnection, Gu
using (var command = dbConnection.CreateCommand())
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = @"SELECT *
FROM [RetryQueueItems]
command.CommandText = $@"SELECT *
FROM [{dbConnection.Schema}].[RetryQueueItems]
WITH (NOLOCK)
WHERE IdDomain = @IdDomain";

Expand All @@ -96,8 +96,8 @@ public async Task<IList<RetryQueueItemDbo>> GetItemsByQueueOrderedAsync(IDbConne
using (var command = dbConnection.CreateCommand())
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = @"SELECT *
FROM [RetryQueueItems]
command.CommandText = $@"SELECT *
FROM [{dbConnection.Schema}].[RetryQueueItems]
WITH (NOLOCK)
WHERE IdDomainRetryQueue = @IdDomainRetryQueue
ORDER BY Sort ASC";
Expand Down Expand Up @@ -133,20 +133,20 @@ public async Task<IList<RetryQueueItemDbo>> GetItemsOrderedAsync(
}

query = string.Concat(query, $@" *
FROM [RetryQueueItems]
FROM [{dbConnection.Schema}].[RetryQueueItems]
WITH (NOLOCK)
WHERE IdDomainRetryQueue IN ({string.Join(",", retryQueueIds.Select(x => $"'{x}'"))})");

if (stuckStatusFilter is null)
{
query = string.Concat(query, $" AND IdItemStatus IN({ string.Join(",", statuses.Select(x => (byte)x))})");
query = string.Concat(query, $" AND IdItemStatus IN({string.Join(",", statuses.Select(x => (byte)x))})");
}
else
{
query = string.Concat(query, $@" AND(
IdItemStatus IN({ string.Join(",", statuses.Select(x => (byte)x))})
IdItemStatus IN({string.Join(",", statuses.Select(x => (byte)x))})
OR(
IdItemStatus = { (byte)stuckStatusFilter.ItemStatus}
IdItemStatus = {(byte)stuckStatusFilter.ItemStatus}
AND DATEADD(SECOND, {Math.Floor(stuckStatusFilter.ExpirationInterval.TotalSeconds)}, ModifiedStatusDate) < @DateTimeUtcNow
)
)");
Expand All @@ -173,7 +173,7 @@ public async Task<IList<RetryQueueItemDbo>> GetNewestItemsAsync(IDbConnection db
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = $@"SELECT *
FROM [RetryQueueItems]
FROM [{dbConnection.Schema}].[RetryQueueItems]
WITH (NOLOCK)
WHERE IdDomainRetryQueue = @IdDomainRetryQueue
AND IdItemStatus IN (@IdItemStatusWaiting, @IdItemStatusInRetry)
Expand All @@ -198,7 +198,7 @@ public async Task<IList<RetryQueueItemDbo>> GetPendingItemsAsync(IDbConnection d
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = $@"SELECT *
FROM [RetryQueueItems]
FROM [{dbConnection.Schema}].[RetryQueueItems]
WITH (NOLOCK)
WHERE IdDomainRetryQueue = @IdDomainRetryQueue
AND IdItemStatus IN (@IdItemStatusWaiting, @IdItemStatusInRetry)
Expand Down Expand Up @@ -237,7 +237,7 @@ public async Task<int> UpdateAsync(IDbConnection dbConnection, Guid idDomain, Re
using (var command = dbConnection.CreateCommand())
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = @"UPDATE [RetryQueueItems]
command.CommandText = $@"UPDATE [{dbConnection.Schema}].[RetryQueueItems]
SET IdItemStatus = @IdItemStatus,
AttemptsCount = @AttemptsCount,
LastExecution = @LastExecution,
Expand Down Expand Up @@ -265,7 +265,7 @@ public async Task<int> UpdateStatusAsync(IDbConnection dbConnection, Guid idDoma
using (var command = dbConnection.CreateCommand())
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = @"UPDATE [RetryQueueItems]
command.CommandText = $@"UPDATE [{dbConnection.Schema}].[RetryQueueItems]
SET IdItemStatus = @IdItemStatus,
ModifiedStatusDate = @DateTimeUtcNow
WHERE IdDomain = @IdDomain";
Expand Down
Loading

0 comments on commit 8f38d9c

Please sign in to comment.