Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/DotNetCore.CAP.Dashboard/DashboardRoutes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ static DashboardRoutes()

Routes.AddRazorPage("/nodes", x =>
{
var id = x.Request.Cookies["cap.node"];
var id = x.Request.Cookies.ContainsKey("cap.node") ? x.Request.Cookies["cap.node"] : string.Empty;
return new NodePage(id);
});

Expand Down
26 changes: 10 additions & 16 deletions src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,22 +141,11 @@ public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int ba
new MySqlParameter("@timeout", timeout), new MySqlParameter("@batchCount", batchCount));
}

public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql = $"SELECT * FROM `{_pubName}` WHERE `Retries`<{_capOptions.Value.FailedRetryCount} AND `Version`='{_capOptions.Value.Version}' AND `Added`<'{fourMinAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";

return await GetMessagesOfNeedRetryAsync(sql);
}

public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM `{_recName}` WHERE `Retries`<{_capOptions.Value.FailedRetryCount} AND `Version`='{_capOptions.Value.Version}' AND `Added`<'{fourMinAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry() =>
await GetMessagesOfNeedRetryAsync(_pubName);

return await GetMessagesOfNeedRetryAsync(sql);
}
public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry() =>
await GetMessagesOfNeedRetryAsync(_recName);

public IMonitoringApi GetMonitoringApi()
{
Expand Down Expand Up @@ -189,8 +178,13 @@ private void StoreReceivedMessage(object[] sqlParams)
connection.ExecuteNonQuery(sql, sqlParams: sqlParams);
}

private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string sql)
private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string tableName)
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT `Id`,`Content`,`Retries`,`Added` FROM `{tableName}` WHERE `Retries`<{_capOptions.Value.FailedRetryCount} " +
$"AND `Version`='{_capOptions.Value.Version}' AND `Added`<'{fourMinAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";

await using var connection = new MySqlConnection(_options.Value.ConnectionString);
var result = connection.ExecuteReader(sql, reader =>
{
Expand Down
27 changes: 10 additions & 17 deletions src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,23 +143,11 @@ public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int ba
return await Task.FromResult(count);
}

public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM {_pubName} WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry() =>
await GetMessagesOfNeedRetryAsync(_pubName);

return await GetMessagesOfNeedRetryAsync(sql);
}

public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM {_recName} WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";

return await GetMessagesOfNeedRetryAsync(sql);
}
public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry() =>
await GetMessagesOfNeedRetryAsync(_recName);

public IMonitoringApi GetMonitoringApi()
{
Expand Down Expand Up @@ -195,8 +183,13 @@ private void StoreReceivedMessage(object[] sqlParams)
connection.ExecuteNonQuery(sql, sqlParams: sqlParams);
}

private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string sql)
private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string tableName)
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT \"Id\",\"Content\",\"Retries\",\"Added\" FROM {tableName} WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} " +
$"AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";

await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
var result = connection.ExecuteReader(sql, reader =>
{
Expand Down
28 changes: 10 additions & 18 deletions src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,24 +142,11 @@ public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int ba
return await Task.FromResult(count);
}

public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql = $"SELECT TOP (200) Id, Content, Retries, Added FROM {_pubName} WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " +
$"AND Version='{_capOptions.Value.Version}' AND Added<'{fourMinAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";

return await GetMessagesOfNeedRetryAsync(sql);
}

public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT TOP (200) Id, Content, Retries, Added FROM {_recName} WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " +
$"AND Version='{_capOptions.Value.Version}' AND Added<'{fourMinAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry() =>
await GetMessagesOfNeedRetryAsync(_pubName);

return await GetMessagesOfNeedRetryAsync(sql);
}
public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry() =>
await GetMessagesOfNeedRetryAsync(_recName);

public IMonitoringApi GetMonitoringApi()
{
Expand Down Expand Up @@ -195,8 +182,13 @@ private void StoreReceivedMessage(object[] sqlParams)
connection.ExecuteNonQuery(sql, sqlParams: sqlParams);
}

private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string sql)
private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string tableName)
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT TOP (200) Id, Content, Retries, Added FROM {tableName} WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " +
$"AND Version='{_capOptions.Value.Version}' AND Added<'{fourMinAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";

List<MediumMessage> result;
using (var connection = new SqlConnection(_options.Value.ConnectionString))
{
Expand Down
2 changes: 1 addition & 1 deletion src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ select replace(convert(varchar, Added, 111), '/','-') + '-' + CONVERT(varchar, D
where StatusName = @statusName
group by replace(convert(varchar, Added, 111), '/','-') + '-' + CONVERT(varchar, DATEPART(hh, Added))
)
select [Key], [Count] from aggr with (nolock) where [Key] in @keys;";
select [Key], [Count] from aggr with (nolock) where [Key] >= @minKey and [Key] <= @maxKey;";

//SQL Server 2012+
var sqlQuery = $@"
Expand Down