In [16]:
#!import ./setup-clients.ipynb

const string PostsIndex = "posts";

## Srolling - `Elastic.Clients.Elasticsearch`

<https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html#scroll-search-results>


In [31]:
#nullable enable
using Elastic.Clients.Elasticsearch;

public class ScrollConfiguration
{
    public int PageSize { get; set; } = 250;

    public Elastic.Clients.Elasticsearch.Duration KeepAlive { get; set; } = new Elastic.Clients.Elasticsearch.Duration("1m");

    public bool DebugOutput { get; set; } = false;

    public static ScrollConfiguration Default => new ();
}

public static class ElasticsearchClientExtensions
{
    public static async IAsyncEnumerable<TDocument> ScrollAsync<TDocument>(
        ElasticsearchClient client,
        Elastic.Clients.Elasticsearch.Indices indices,
        Action<SearchRequestDescriptor<TDocument>>? configure = default, 
        ScrollConfiguration? scrollConfiguration = default,
        [System.Runtime.CompilerServices.EnumeratorCancellation] System.Threading.CancellationToken cancellationToken = default)
    {
        Elastic.Clients.Elasticsearch.OpenPointInTimeResponse? pit = default;

        scrollConfiguration ??= ScrollConfiguration.Default;

        try
        {
            pit = await client.OpenPointInTimeAsync(
                indices,
                d => d.KeepAlive(scrollConfiguration!.KeepAlive),
                cancellationToken);

            HandleError(pit);
            
            var searchResponse = await client.SearchAsync<TDocument>(
                s => ConfigureSearch(s),
                cancellationToken);

            if(scrollConfiguration.DebugOutput)
            {
                DumpResponse(searchResponse);
            }
            HandleError(searchResponse);

            while (searchResponse.Documents.Count > 0)
            {
                foreach (var document in searchResponse.Documents)
                {
                    yield return document;
                }

                if(searchResponse.Hits.Last().Sort is var searchAfter and { Count: > 0 })
                {
                    searchResponse = await client.SearchAsync<TDocument>(
                        s => ConfigureSearch(s).SearchAfter(searchAfter.ToList()),
                        cancellationToken);

                    
                    if(scrollConfiguration.DebugOutput)
                    {
                        DumpResponse(searchResponse);
                    }
                    HandleError(searchResponse);
                }
                else
                {
                    break;
                }
            }
        }
        finally
        {
            if(pit is not null)
            {
                var closeResponse = await client.ClosePointInTimeAsync(d => d.Id(pit.Id), cancellationToken);
                HandleError(closeResponse);
            }
        }

        SearchRequestDescriptor<TDocument> ConfigureSearch(SearchRequestDescriptor<TDocument> descriptor)
        {
            descriptor = descriptor
                .Index(indices)
                .Pit(pit!.Id, d => d.KeepAlive(scrollConfiguration!.KeepAlive))
                .Size(scrollConfiguration.PageSize);

            if(configure is not null)
            {
                configure(descriptor);
            }
            else
            {
                descriptor.Sort(
                    [
                        SortOptions.Field(
                            "_doc",
                            new Elastic.Clients.Elasticsearch.FieldSort { Order = Elastic.Clients.Elasticsearch.SortOrder.Asc })
                    ]);
            }

            return descriptor;
        }

        void HandleError(ElasticsearchResponse response)
        {
            if (!response.IsSuccess() && response.TryGetOriginalException(out var ex))
            {
                throw ex!;
            }
        }
    }
}

In [32]:
var scrollConfiguration = ScrollConfiguration.Default;
scrollConfiguration.PageSize = 500;
scrollConfiguration.DebugOutput = true;

var source = ElasticsearchClientExtensions.ScrollAsync<BlogPost>(client, PostsIndex, scrollConfiguration: scrollConfiguration);

display(await source.CountAsync());
// var posts = await source.ToListAsync();

Valid Elasticsearch response built from a successful (200) low level call on POST: /_search

# Audit trail of this API call:
 - [1] HealthyResponse: Node: https://elastic:redacted@127.0.0.1:9200/ Took: 00:00:00.0225312


⚙️❔Request:

⚙️🟰Response:

Valid Elasticsearch response built from a successful (200) low level call on POST: /_search

# Audit trail of this API call:
 - [1] HealthyResponse: Node: https://elastic:redacted@127.0.0.1:9200/ Took: 00:00:00.0188166


⚙️❔Request:

⚙️🟰Response:

Valid Elasticsearch response built from a successful (200) low level call on POST: /_search

# Audit trail of this API call:
 - [1] HealthyResponse: Node: https://elastic:redacted@127.0.0.1:9200/ Took: 00:00:00.0214271


⚙️❔Request:

⚙️🟰Response: