From ba5100ad561637fbec424711eaa1ad09c02a4b94 Mon Sep 17 00:00:00 2001 From: Steve Gordon Date: Tue, 13 Jul 2021 14:06:11 +0100 Subject: [PATCH] Support async SQL searches (#5869) --- .../XPack/Sql/QuerySql/QuerySqlRequest.cs | 44 ++--- .../XPack/Sql/QuerySql/QuerySqlResponse.cs | 22 ++- .../Sql/Status/SqlSearchStatusResponse.cs | 2 +- .../XPack/Sql/SqlSearchApiCoordinatedTests.cs | 153 ++++++++++++++++++ 4 files changed, 200 insertions(+), 21 deletions(-) create mode 100644 tests/Tests/XPack/Sql/SqlSearchApiCoordinatedTests.cs diff --git a/src/Nest/XPack/Sql/QuerySql/QuerySqlRequest.cs b/src/Nest/XPack/Sql/QuerySql/QuerySqlRequest.cs index 33cb1598f79..5674ea2c805 100644 --- a/src/Nest/XPack/Sql/QuerySql/QuerySqlRequest.cs +++ b/src/Nest/XPack/Sql/QuerySql/QuerySqlRequest.cs @@ -11,6 +11,14 @@ namespace Nest [ReadAs(typeof(QuerySqlRequest))] public partial interface IQuerySqlRequest : ISqlRequest { + /// + /// Return the results in a columnar fashion: one row represents all the values of a certain column from the current page + /// of results. + /// The following formats can be returned in columnar orientation: json, yaml, cbor and smile. + /// + [DataMember(Name = "columnar")] + bool? Columnar { get; set; } + /// /// Continue to the next page by sending back the cursor field returned in the previous response. /// @@ -19,56 +27,53 @@ public partial interface IQuerySqlRequest : ISqlRequest /// Unlike scroll, receiving the last page is enough to guarantee that the Elasticsearch state is cleared. /// /// - [DataMember(Name="cursor")] + [DataMember(Name = "cursor")] string Cursor { get; set; } /// - /// Return the results in a columnar fashion: one row represents all the values of a certain column from the current page of results. - /// The following formats can be returned in columnar orientation: json, yaml, cbor and smile. + /// Make the search asynchronous by setting a duration you’d like to wait for synchronous results. /// - [DataMember(Name="columnar")] - bool? Columnar { get; set; } + [DataMember(Name = "wait_for_completion_timeout")] + Time WaitForCompletionTimeout { get; set; } } public partial class QuerySqlRequest { - /// - /// > - public string Cursor { get; set; } - /// - /// > public bool? Columnar { get; set; } + /// + public string Cursor { get; set; } + /// - /// > public int? FetchSize { get; set; } /// - /// > public QueryContainer Filter { get; set; } /// - /// > public string Query { get; set; } + /// + public IRuntimeFields RuntimeFields { get; set; } + /// - /// > public string TimeZone { get; set; } - /// - public IRuntimeFields RuntimeFields { get; set; } + /// + public Time WaitForCompletionTimeout { get; set; } } public partial class QuerySqlDescriptor { - string IQuerySqlRequest.Cursor { get; set; } bool? IQuerySqlRequest.Columnar { get; set; } + string IQuerySqlRequest.Cursor { get; set; } int? ISqlRequest.FetchSize { get; set; } QueryContainer ISqlRequest.Filter { get; set; } string ISqlRequest.Query { get; set; } - string ISqlRequest.TimeZone { get; set; } IRuntimeFields ISqlRequest.RuntimeFields { get; set; } + string ISqlRequest.TimeZone { get; set; } + Time IQuerySqlRequest.WaitForCompletionTimeout { get; set; } /// /// > @@ -98,5 +103,8 @@ public QuerySqlDescriptor Filter(Func, QueryConta /// public QuerySqlDescriptor RuntimeFields(Func> runtimeFieldsSelector) => Assign(runtimeFieldsSelector, (a, v) => a.RuntimeFields = v?.Invoke(new RuntimeFieldsDescriptor())?.Value); + + /// + public QuerySqlDescriptor WaitForCompletionTimeout(Time frequency) => Assign(frequency, (a, v) => a.WaitForCompletionTimeout = v); } } diff --git a/src/Nest/XPack/Sql/QuerySql/QuerySqlResponse.cs b/src/Nest/XPack/Sql/QuerySql/QuerySqlResponse.cs index 14e4ed55ab0..813c60640b1 100644 --- a/src/Nest/XPack/Sql/QuerySql/QuerySqlResponse.cs +++ b/src/Nest/XPack/Sql/QuerySql/QuerySqlResponse.cs @@ -25,13 +25,31 @@ public class QuerySqlResponse : ResponseBase public string Cursor { get; internal set; } /// - /// If has been set to false, this property will contain the row values + /// Identifier for the search. + /// + [DataMember(Name = "id")] + public string Id { get; internal set; } + + /// + /// If true, the response does not contain complete search results. + /// + [DataMember(Name = "is_partial")] + public bool IsPartial { get; internal set; } + + /// + /// If true, the search request is still executing. + /// + [DataMember(Name = "is_running")] + public bool IsRunning { get; internal set; } + + /// + /// If has been set to false, this property will contain the row values /// [DataMember(Name = "rows")] public IReadOnlyCollection Rows { get; internal set; } = EmptyReadOnly.Collection; /// - /// If has been set to true, this property will contain the column values + /// If has been set to true, this property will contain the column values /// [DataMember(Name = "values")] public IReadOnlyCollection Values { get; internal set; } = EmptyReadOnly.Collection; diff --git a/src/Nest/XPack/Sql/Status/SqlSearchStatusResponse.cs b/src/Nest/XPack/Sql/Status/SqlSearchStatusResponse.cs index 0efc9d864b7..a9590a6c458 100644 --- a/src/Nest/XPack/Sql/Status/SqlSearchStatusResponse.cs +++ b/src/Nest/XPack/Sql/Status/SqlSearchStatusResponse.cs @@ -15,7 +15,7 @@ public class SqlSearchStatusResponse : ResponseBase /// For a completed search shows the http status code of the completed search. /// [DataMember(Name = "completion_status")] - public int CompletionStatus { get; internal set; } + public int? CompletionStatus { get; internal set; } /// /// For a running search shows a timestamp when the eql search started, in milliseconds since the Unix epoch. diff --git a/tests/Tests/XPack/Sql/SqlSearchApiCoordinatedTests.cs b/tests/Tests/XPack/Sql/SqlSearchApiCoordinatedTests.cs new file mode 100644 index 00000000000..5979cb0d39f --- /dev/null +++ b/tests/Tests/XPack/Sql/SqlSearchApiCoordinatedTests.cs @@ -0,0 +1,153 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Threading.Tasks; +using Elastic.Elasticsearch.Xunit.XunitPlumbing; +using FluentAssertions; +using Nest; +using Tests.Core.Extensions; +using Tests.Core.ManagedElasticsearch.Clusters; +using Tests.Domain; +using Tests.Domain.Helpers; +using Tests.Framework.EndpointTests; +using Tests.Framework.EndpointTests.TestState; + +namespace Tests.XPack.Sql +{ + [SkipVersion("<7.14.0", "All endpoints GA in 7.14.0")] + public class SqlSearchApiCoordinatedTests : CoordinatedIntegrationTestBase + { + private const string DeleteStep = nameof(DeleteStep); + private const string GetStep = nameof(GetStep); + private const string StatusStep = nameof(StatusStep); + private const string SubmitStep = nameof(SubmitStep); + private const string WaitStep = nameof(WaitStep); + + private static readonly string SqlQuery = + $@"SELECT type, name, startedOn, numberOfCommits +FROM {TestValueHelper.ProjectsIndex} +WHERE type = '{Project.TypeName}' +ORDER BY numberOfContributors DESC"; + + public SqlSearchApiCoordinatedTests(XPackCluster cluster, EndpointUsage usage) : base(new CoordinatedUsage(cluster, usage, testOnlyOne: true) + { + { + SubmitStep, u => + u.Calls( + _ => new QuerySqlRequest { Query = SqlQuery, FetchSize = 5, WaitForCompletionTimeout = "0s" }, + (_, d) => d + .Query(SqlQuery) + .FetchSize(5) + .WaitForCompletionTimeout("0s"), + (_, c, f) => c.Sql.Query(f), + (_, c, f) => c.Sql.QueryAsync(f), + (_, c, r) => c.Sql.Query(r), + (_, c, r) => c.Sql.QueryAsync(r), + (r, values) => values.ExtendedValue("id", r.Id) + ) + }, + { + StatusStep, u => + u.Calls( + v => new SqlSearchStatusRequest(v), + (v, d) => d, + (v, c, f) => c.Sql.SearchStatus(v, f), + (v, c, f) => c.Sql.SearchStatusAsync(v, f), + (v, c, r) => c.Sql.SearchStatus(r), + (v, c, r) => c.Sql.SearchStatusAsync(r), + uniqueValueSelector: values => values.ExtendedValue("id") + ) + }, + { + // allows the search to complete + WaitStep, u => u.Call(async (_, c) => + { + // wait for the search to complete + var complete = false; + var count = 0; + + while (!complete && count++ < 10) + { + await Task.Delay(100); + var status = await c.Sql.SearchStatusAsync(u.Usage.CallUniqueValues.ExtendedValue("id")); + complete = !status.IsRunning && status.CompletionStatus.HasValue; + } + }) + }, + { + GetStep, u => + u.Calls( + v => new SqlGetRequest(v), + (_, d) => d, + (v, c, f) => c.Sql.Get(v, f), + (v, c, f) => c.Sql.GetAsync(v, f), + (_, c, r) => c.Sql.Get(r), + (_, c, r) => c.Sql.GetAsync(r), + uniqueValueSelector: values => values.ExtendedValue("id") + ) + }, + { + DeleteStep, u => + u.Calls( + v => new SqlDeleteRequest(v), + (_, d) => d, + (v, c, f) => c.Sql.Delete(v, f), + (v, c, f) => c.Sql.DeleteAsync(v, f), + (_, c, r) => c.Sql.Delete(r), + (_, c, r) => c.Sql.DeleteAsync(r), + uniqueValueSelector: values => values.ExtendedValue("id") + ) + } + }) { } + + [I] public async Task SqlSearchResponse() => await Assert(SubmitStep, r => + { + r.ShouldBeValid(); + r.Id.Should().NotBeNullOrEmpty(); + r.IsPartial.Should().BeTrue(); + r.IsRunning.Should().BeTrue(); + }); + + [I] public async Task SqlSearchStatusResponse() => await Assert(StatusStep, r => + { + r.ShouldBeValid(); + r.Id.Should().NotBeNullOrEmpty(); + r.IsPartial.Should().BeTrue(); + r.IsRunning.Should().BeTrue(); + r.ExpirationTimeInMillis.Should().BeGreaterThan(0); + r.StartTimeInMillis.Should().BeGreaterThan(0); + }); + + [I] public async Task SqlGetResponse() => await Assert(GetStep, r => + { + r.ShouldBeValid(); + r.IsPartial.Should().BeFalse(); + r.IsRunning.Should().BeFalse(); + + r.Cursor.Should().NotBeNullOrWhiteSpace("response cursor"); + r.Rows.Should().NotBeNullOrEmpty(); + r.Columns.Should().NotBeNullOrEmpty().And.HaveCount(4); + foreach (var c in r.Columns) + { + c.Name.Should().NotBeNullOrWhiteSpace("column name"); + c.Type.Should().NotBeNullOrWhiteSpace("column type"); + } + foreach (var row in r.Rows) + { + row.Should().NotBeNull().And.HaveCount(4); + var type = row[0].As().Should().NotBeNullOrWhiteSpace("a type returned null"); + var name = row[1].As().Should().NotBeNullOrWhiteSpace("a name returned null"); + var date = row[2].As().Should().BeAfter(default); + var numberOfCommits = row[3].As(); + } + }); + + [I] public async Task SqlDeleteResponse() => await Assert(DeleteStep, r => + { + r.ShouldBeValid(); + r.Acknowledged.Should().BeTrue(); + }); + } +}