Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Elasticsearch.Net/Transport/Pipeline/RequestPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ public async Task FirstPoolUsageAsync(SemaphoreSlim semaphore, CancellationToken
{
if (!FirstPoolUsageNeedsSniffing) return;

// TODO cancellationToken could throw here and will bubble out as OperationCancelledException
// everywhere else it would bubble out wrapped in a `UnexpectedElasticsearchClientException`
var success = await semaphore.WaitAsync(_settings.RequestTimeout, cancellationToken).ConfigureAwait(false);
if (!success)
{
Expand Down
3 changes: 3 additions & 0 deletions src/Elasticsearch.Net/Transport/Transport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ public async Task<TResponse> RequestAsync<TResponse>(HttpMethod method, string p
}
catch (Exception killerException)
{
if (killerException is OperationCanceledException && cancellationToken.IsCancellationRequested)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

pipeline.AuditCancellationRequested();

throw new UnexpectedElasticsearchClientException(killerException, seenExceptions)
{
Request = requestData,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using Elasticsearch.Net;

namespace Nest
{
Expand Down Expand Up @@ -27,7 +28,15 @@ protected CoordinatedRequestObserverBase(Action<T> onNext = null, Action<Excepti

public void OnCompleted() => _completed?.Invoke();

public void OnError(Exception error) => _onError?.Invoke(error);
public void OnError(Exception error)
{
// This normalizes task cancellation exceptions for observables
// If a task cancellation happens in the client it bubbles out as a UnexpectedElasticsearchClientException
// where as inside our IObservable implementation we .ThrowIfCancellationRequested() directly.
if (error is UnexpectedElasticsearchClientException es && es.InnerException != null && es.InnerException is OperationCanceledException c)
_onError?.Invoke(c);
else _onError?.Invoke(error);
}

public void OnNext(T value) => _onNext?.Invoke(value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ public void CancelBulkAll()
//when we subscribe the observable becomes hot
observableBulk.Subscribe(bulkObserver);

//we wait Nseconds to see some bulks
//we wait N seconds to see some bulks
handle.WaitOne(TimeSpan.FromSeconds(3));
tokenSource.Cancel();
//we wait Nseconds to give in flight request a chance to cancel
//we wait N seconds to give in flight request a chance to cancel
handle.WaitOne(TimeSpan.FromSeconds(3));
if (ex != null && !(ex is TaskCanceledException) && !(ex is OperationCanceledException)) throw ex;

if (ex != null && !(ex is OperationCanceledException)) throw ex;

seenPages.Should().BeLessThan(pages).And.BeGreaterThan(0);
var count = Client.Count<SmallObject>(f => f.Index(index));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Threading;
using System.Threading.Tasks;
using Elastic.Xunit.XunitPlumbing;
using Elasticsearch.Net;
using FluentAssertions;
using Nest;
using Tests.Core.ManagedElasticsearch.Clusters;
Expand Down Expand Up @@ -50,7 +51,8 @@ public void DisposingObservableCancelsBulkAll()
observableBulk.Dispose();
//we wait N seconds to give in flight request a chance to cancel
handle.WaitOne(TimeSpan.FromSeconds(3));
if (ex != null && !(ex is TaskCanceledException) && !(ex is OperationCanceledException)) throw ex;

if (ex != null && !(ex is OperationCanceledException)) throw ex;

seenPages.Should().BeLessThan(pages).And.BeGreaterThan(0);
var count = Client.Count<SmallObject>(f => f.Index(index));
Expand Down