Skip to content

MergeResources concurrent call return 429 #5029

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using EnsureThat;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;
using Microsoft.Health.Abstractions.Exceptions;
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.Core.Models;
using Microsoft.Health.Fhir.SqlServer.Features.Schema;
Expand Down Expand Up @@ -245,8 +246,6 @@ internal async Task<long> MergeResourcesGetTransactionVisibilityAsync(Cancellati
// These waits cause intermittent execution timeouts even for very short (~10msec) calls.
var start = DateTime.UtcNow;
var timeoutRetries = 0;
var delayOnOverloadMilliseconds = 100;
var totaldelayOnOverloadMilliseconds = 0;
while (true)
{
try
Expand All @@ -259,18 +258,7 @@ internal async Task<long> MergeResourcesGetTransactionVisibilityAsync(Cancellati
var sqlEx = e as SqlException;
if (sqlEx != null && sqlEx.Number == SqlStoreErrorCodes.MergeResourcesConcurrentCallsIsAboveOptimal)
{
_logger.LogWarning(e, $"Error on {nameof(MergeResourcesBeginTransactionAsync)}: MergeResources concurrent calls is above optimal. Delay={delayOnOverloadMilliseconds} milliseconds.");

// TODO: Prepare to throw 429 instead of wait/delay when bundle code is ready
await Task.Delay(delayOnOverloadMilliseconds, cancellationToken);
totaldelayOnOverloadMilliseconds += delayOnOverloadMilliseconds;
delayOnOverloadMilliseconds = (int)(delayOnOverloadMilliseconds * (2 + (RandomNumberGenerator.GetInt32(1000) / 1000.0)));
if (totaldelayOnOverloadMilliseconds > 60000)
{
cmd.Parameters.Remove(enableThrottling); // default for @EnableThrottling is false
}

continue;
throw new RequestRateExceededException(null); // Let the retryAfter header be set to default
}

if (e.IsExecutionTimeout() && timeoutRetries++ < 3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,37 +120,47 @@ INSERT INTO Resource SELECT * FROM inserted -- this will cause dup key exception
public async Task DatabaseMergeThrottling(bool useDefaultMergeOptions)
{
await _fixture.SqlHelper.ExecuteSqlCmd("TRUNCATE TABLE EventLog");

// set optimal threashold to low value to see waits.
await _fixture.SqlHelper.ExecuteSqlCmd("INSERT INTO dbo.Parameters (Id, Number) SELECT 'MergeResources.OptimalConcurrentCalls', 1");

// make merge calls longer
await _fixture.SqlHelper.ExecuteSqlCmd(@"
CREATE TRIGGER Transactions_Trigger ON Transactions FOR UPDATE -- This should make commit in MergeResources to run longer
AS
WAITFOR DELAY '00:00:01'
");
var patient = (Patient)Samples.GetJsonSample("Patient").ToPoco();
await Parallel.ForAsync(0, 8, async (i, cancell) =>
CREATE TRIGGER Transactions_Trigger ON Transactions FOR UPDATE
AS
WAITFOR DELAY '00:00:01'
");

try
{
var iInt = i;
Thread.Sleep(100 * iInt); // do not start all merges at once
if (useDefaultMergeOptions)
var patient = (Patient)Samples.GetJsonSample("Patient").ToPoco();
var exceptionThrown = false;

try
{
patient.Id = Guid.NewGuid().ToString();
await Mediator.UpsertResourceAsync(patient.ToResourceElement()); // try enlist in tran w/o tran scope
await Parallel.ForAsync(0, 8, async (i, cancell) =>
{
Thread.Sleep(100 * i);
if (useDefaultMergeOptions)
{
patient.Id = Guid.NewGuid().ToString();
await Mediator.UpsertResourceAsync(patient.ToResourceElement());
}
else
{
var resOp = new ResourceWrapperOperation(CreateObservationResourceWrapper(Guid.NewGuid().ToString()), true, true, null, false, false, null);
await _dataStore.MergeAsync([resOp], new MergeOptions(false), CancellationToken.None);
}
});
}
else
catch (RequestRateExceededException)
{
var resOp = new ResourceWrapperOperation(CreateObservationResourceWrapper(Guid.NewGuid().ToString()), true, true, null, false, false, null);
await _dataStore.MergeAsync([resOp], new MergeOptions(false), CancellationToken.None); // do not enlist in tran
exceptionThrown = true;
}
});
await _fixture.SqlHelper.ExecuteSqlCmd("DROP TRIGGER Transactions_Trigger");
await _fixture.SqlHelper.ExecuteSqlCmd("DELETE FROM dbo.Parameters WHERE Id = 'MergeResources.OptimalConcurrentCalls'");

// make sure waits were recorded
await _fixture.SqlHelper.ExecuteSqlCmd("IF NOT EXISTS (SELECT * FROM EventLog WHERE Process = 'MergeResourcesBeginTransaction' AND Status = 'Error') RAISERROR('Waits were not recorded', 18, 127)");
Assert.True(exceptionThrown, "Expected RequestRateExceededException was not thrown.");
}
finally
{
await _fixture.SqlHelper.ExecuteSqlCmd("DROP TRIGGER Transactions_Trigger");
await _fixture.SqlHelper.ExecuteSqlCmd("DELETE FROM dbo.Parameters WHERE Id = 'MergeResources.OptimalConcurrentCalls'");
}
}

[Fact]
Expand Down
Loading