Skip to content

Commit

Permalink
Fix date file format bug when retrying query from database. #1143
Browse files Browse the repository at this point in the history
  • Loading branch information
yang-xiaodong committed Jun 4, 2022
1 parent aceb505 commit 432500f
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 12 deletions.
15 changes: 11 additions & 4 deletions src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,17 @@ private void StoreReceivedMessage(object[] sqlParams)

private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string tableName)
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var fourMinAgo = DateTime.Now.AddMinutes(-4);
var sql =
$"SELECT `Id`,`Content`,`Retries`,`Added` FROM `{tableName}` WHERE `Retries`<{_capOptions.Value.FailedRetryCount} " +
$"AND `Version`='{_capOptions.Value.Version}' AND `Added`<'{fourMinAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
$"SELECT `Id`,`Content`,`Retries`,`Added` FROM `{tableName}` WHERE `Retries`<@Retries " +
$"AND `Version`=@Version AND `Added`<@Added AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";

object[] sqlParams =
{
new MySqlParameter("@Retries", _capOptions.Value.FailedRetryCount),
new MySqlParameter("@Version", _capOptions.Value.Version),
new MySqlParameter("@Added", fourMinAgo)
};

await using var connection = new MySqlConnection(_options.Value.ConnectionString);
var result = connection.ExecuteReader(sql, reader =>
Expand All @@ -205,7 +212,7 @@ private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(strin
}
return messages;
});
}, sqlParams);

return result;
}
Expand Down
15 changes: 11 additions & 4 deletions src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,17 @@ private void StoreReceivedMessage(object[] sqlParams)

private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string tableName)
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var fourMinAgo = DateTime.Now.AddMinutes(-4);
var sql =
$"SELECT \"Id\",\"Content\",\"Retries\",\"Added\" FROM {tableName} WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} " +
$"AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
$"SELECT \"Id\",\"Content\",\"Retries\",\"Added\" FROM {tableName} WHERE \"Retries\"<@Retries " +
$"AND \"Version\"=@Version AND \"Added\"<@Added AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";

object[] sqlParams =
{
new NpgsqlParameter("@Retries", _capOptions.Value.FailedRetryCount),
new NpgsqlParameter("@Version", _capOptions.Value.Version),
new NpgsqlParameter("@Added", fourMinAgo)
};

await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
var result = connection.ExecuteReader(sql, reader =>
Expand All @@ -206,7 +213,7 @@ private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(strin
}
return messages;
});
}, sqlParams);

return result;
}
Expand Down
15 changes: 11 additions & 4 deletions src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,17 @@ private void StoreReceivedMessage(object[] sqlParams)

private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string tableName)
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var fourMinAgo = DateTime.Now.AddMinutes(-4);
var sql =
$"SELECT TOP (200) Id, Content, Retries, Added FROM {tableName} WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " +
$"AND Version='{_capOptions.Value.Version}' AND Added<'{fourMinAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
$"SELECT TOP (200) Id, Content, Retries, Added FROM {tableName} WITH (readpast) WHERE Retries<@Retries " +
$"AND Version=@Version AND Added<@Added AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";

object[] sqlParams =
{
new SqlParameter("@Retries", _capOptions.Value.FailedRetryCount),
new SqlParameter("@Version", _capOptions.Value.Version),
new SqlParameter("@Added", fourMinAgo)
};

List<MediumMessage> result;
await using (var connection = new SqlConnection(_options.Value.ConnectionString))
Expand All @@ -207,7 +214,7 @@ await using (var connection = new SqlConnection(_options.Value.ConnectionString)
}
return messages;
});
}, sqlParams);
}

return result;
Expand Down

0 comments on commit 432500f

Please sign in to comment.