Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion build/scripts/scripts.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
<Content Include="..\..\.github\workflows\make-release-notes.yml"><Link>make-release-notes.yml</Link></Content>
</ItemGroup>
<ItemGroup>
<PackageReference Include="FSharp.Core" Version="6.0.1" />
<PackageReference Include="Bullseye" Version="3.3.0" />
<PackageReference Include="Elastic.Elasticsearch.Managed" Version="0.3.0" />

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
<PackageReference Include="Elastic.Transport" Version="0.3.2" />
<PackageReference Condition="'$(TargetFramework)' == 'netstandard2.0'" Include="System.Reflection.Emit" Version="4.3.0" />
<PackageReference Condition="'$(TargetFramework)' == 'netstandard2.0'" Include="System.Reflection.Emit.Lightweight" Version="4.3.0" />
<PackageReference Include="Microsoft.VisualStudio.Threading.Analyzers" Version="17.1.46">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
</ItemGroup>
<ItemGroup>
<InternalsVisibleTo Include="Tests" Key="$(ExposedPublicKey)" />
Expand Down
14 changes: 8 additions & 6 deletions src/Elastic.Clients.Elasticsearch/Helpers/BulkAllObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,15 @@ private void BulkAll(IObserver<BulkAllResponse> observer)
var documents = _partitionedBulkRequest.Documents;
var partitioned = new PartitionHelper<T>(documents, _bulkSize);
#pragma warning disable 4014
#pragma warning disable VSTHRD110 // Observe result of async calls
partitioned.ForEachAsync(
#pragma warning restore 4014
(buffer, page) => BulkAsync(buffer, page, 0),
(buffer, response) => observer.OnNext(response),
ex => OnCompleted(ex, observer),
_maxDegreeOfParallelism
);
#pragma warning restore VSTHRD110 // Observe result of async calls
}

private void OnCompleted(Exception exception, IObserver<BulkAllResponse> observer)
Expand Down Expand Up @@ -152,7 +154,7 @@ private async Task<BulkAllResponse> BulkAsync(IList<T> buffer, long page, int ba
_bulkResponseCallback?.Invoke(response);

if (!response.ApiCall.Success)
return await HandleBulkRequest(buffer, page, backOffRetries, response).ConfigureAwait(false);
return await HandleBulkRequestAsync(buffer, page, backOffRetries, response).ConfigureAwait(false);

var retryableDocuments = new List<T>();
var droppedDocuments = new List<Tuple<BulkResponseItemBase, T>>();
Expand All @@ -171,7 +173,7 @@ private async Task<BulkAllResponse> BulkAsync(IList<T> buffer, long page, int ba
HandleDroppedDocuments(droppedDocuments, response);

if (retryableDocuments.Count > 0 && backOffRetries < _backOffRetries)
return await RetryDocuments(page, ++backOffRetries, retryableDocuments).ConfigureAwait(false);
return await RetryDocumentsAsync(page, ++backOffRetries, retryableDocuments).ConfigureAwait(false);

if (retryableDocuments.Count > 0)
throw ThrowOnBadBulk(response, $"Bulk indexing failed and after retrying {backOffRetries} times");
Expand All @@ -193,7 +195,7 @@ private void HandleDroppedDocuments(List<Tuple<BulkResponseItemBase, T>> dropped
throw ThrowOnBadBulk(response, $"{nameof(BulkAll)} halted after receiving failures that can not be retried from _bulk");
}

private async Task<BulkAllResponse> HandleBulkRequest(IList<T> buffer, long page, int backOffRetries, BulkResponse response)
private async Task<BulkAllResponse> HandleBulkRequestAsync(IList<T> buffer, long page, int backOffRetries, BulkResponse response)
{
var clientException = response.ApiCall.OriginalException as TransportException;
var failureReason = clientException?.FailureReason;
Expand All @@ -205,7 +207,7 @@ private async Task<BulkAllResponse> HandleBulkRequest(IList<T> buffer, long page
throw ThrowOnBadBulk(response, $"{nameof(BulkAll)} halted after attempted bulk failed over all the active nodes");

ThrowOnExhaustedRetries();
return await RetryDocuments(page, ++backOffRetries, buffer).ConfigureAwait(false);
return await RetryDocumentsAsync(page, ++backOffRetries, buffer).ConfigureAwait(false);
case PipelineFailure.CouldNotStartSniffOnStartup:
case PipelineFailure.BadAuthentication:
case PipelineFailure.NoNodesAttempted:
Expand All @@ -218,7 +220,7 @@ private async Task<BulkAllResponse> HandleBulkRequest(IList<T> buffer, long page
case PipelineFailure.BadRequest:
default:
ThrowOnExhaustedRetries();
return await RetryDocuments(page, ++backOffRetries, buffer).ConfigureAwait(false);
return await RetryDocumentsAsync(page, ++backOffRetries, buffer).ConfigureAwait(false);
}

void ThrowOnExhaustedRetries()
Expand All @@ -231,7 +233,7 @@ void ThrowOnExhaustedRetries()
}
}

private async Task<BulkAllResponse> RetryDocuments(long page, int backOffRetries, IList<T> retryDocuments)
private async Task<BulkAllResponse> RetryDocumentsAsync(long page, int backOffRetries, IList<T> retryDocuments)
{
_incrementRetries();
await Task.Delay(_backOffTime, _compositeCancelToken).ConfigureAwait(false);
Expand Down
36 changes: 36 additions & 0 deletions src/Elastic.Clients.Elasticsearch/packages.lock.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
"Microsoft.SourceLink.Common": "1.0.0"
}
},
"Microsoft.VisualStudio.Threading.Analyzers": {
"type": "Direct",
"requested": "[17.1.46, )",
"resolved": "17.1.46",
"contentHash": "7pImoMcQaWZYAwu1aDBB8yBkvgad13yjrRHQ65pwHMX757vZ49OrNaEuRSLDu2PjZGonsTkQAJK8JK4W/wW4bw=="
},
"Microsoft.Build.Tasks.Git": {
"type": "Transitive",
"resolved": "1.0.0",
Expand Down Expand Up @@ -129,6 +135,12 @@
"Microsoft.SourceLink.Common": "1.0.0"
}
},
"Microsoft.VisualStudio.Threading.Analyzers": {
"type": "Direct",
"requested": "[17.1.46, )",
"resolved": "17.1.46",
"contentHash": "7pImoMcQaWZYAwu1aDBB8yBkvgad13yjrRHQ65pwHMX757vZ49OrNaEuRSLDu2PjZGonsTkQAJK8JK4W/wW4bw=="
},
"Microsoft.Bcl.AsyncInterfaces": {
"type": "Transitive",
"resolved": "6.0.0",
Expand Down Expand Up @@ -266,6 +278,12 @@
"Microsoft.SourceLink.Common": "1.0.0"
}
},
"Microsoft.VisualStudio.Threading.Analyzers": {
"type": "Direct",
"requested": "[17.1.46, )",
"resolved": "17.1.46",
"contentHash": "7pImoMcQaWZYAwu1aDBB8yBkvgad13yjrRHQ65pwHMX757vZ49OrNaEuRSLDu2PjZGonsTkQAJK8JK4W/wW4bw=="
},
"NETStandard.Library": {
"type": "Direct",
"requested": "[2.0.3, )",
Expand Down Expand Up @@ -509,6 +527,12 @@
"Microsoft.SourceLink.Common": "1.0.0"
}
},
"Microsoft.VisualStudio.Threading.Analyzers": {
"type": "Direct",
"requested": "[17.1.46, )",
"resolved": "17.1.46",
"contentHash": "7pImoMcQaWZYAwu1aDBB8yBkvgad13yjrRHQ65pwHMX757vZ49OrNaEuRSLDu2PjZGonsTkQAJK8JK4W/wW4bw=="
},
"Microsoft.Bcl.AsyncInterfaces": {
"type": "Transitive",
"resolved": "6.0.0",
Expand Down Expand Up @@ -631,6 +655,12 @@
"Microsoft.SourceLink.Common": "1.0.0"
}
},
"Microsoft.VisualStudio.Threading.Analyzers": {
"type": "Direct",
"requested": "[17.1.46, )",
"resolved": "17.1.46",
"contentHash": "7pImoMcQaWZYAwu1aDBB8yBkvgad13yjrRHQ65pwHMX757vZ49OrNaEuRSLDu2PjZGonsTkQAJK8JK4W/wW4bw=="
},
"Microsoft.Build.Tasks.Git": {
"type": "Transitive",
"resolved": "1.0.0",
Expand Down Expand Up @@ -714,6 +744,12 @@
"Microsoft.SourceLink.Common": "1.0.0"
}
},
"Microsoft.VisualStudio.Threading.Analyzers": {
"type": "Direct",
"requested": "[17.1.46, )",
"resolved": "17.1.46",
"contentHash": "7pImoMcQaWZYAwu1aDBB8yBkvgad13yjrRHQ65pwHMX757vZ49OrNaEuRSLDu2PjZGonsTkQAJK8JK4W/wW4bw=="
},
"Microsoft.Build.Tasks.Git": {
"type": "Transitive",
"resolved": "1.0.0",
Expand Down