Skip to content

[Backport 7.x] Support async SQL searches #5870

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 26 additions & 18 deletions src/Nest/XPack/Sql/QuerySql/QuerySqlRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ namespace Nest
[ReadAs(typeof(QuerySqlRequest))]
public partial interface IQuerySqlRequest : ISqlRequest
{
/// <summary>
/// Return the results in a columnar fashion: one row represents all the values of a certain column from the current page
/// of results.
/// The following formats can be returned in columnar orientation: json, yaml, cbor and smile.
/// </summary>
[DataMember(Name = "columnar")]
bool? Columnar { get; set; }

/// <summary>
/// Continue to the next page by sending back the cursor field returned in the previous response.
/// <para>
Expand All @@ -19,56 +27,53 @@ public partial interface IQuerySqlRequest : ISqlRequest
/// Unlike scroll, receiving the last page is enough to guarantee that the Elasticsearch state is cleared.
/// </para>
/// </summary>
[DataMember(Name="cursor")]
[DataMember(Name = "cursor")]
string Cursor { get; set; }

/// <summary>
/// Return the results in a columnar fashion: one row represents all the values of a certain column from the current page of results.
/// The following formats can be returned in columnar orientation: json, yaml, cbor and smile.
/// Make the search asynchronous by setting a duration you’d like to wait for synchronous results.
/// </summary>
[DataMember(Name="columnar")]
bool? Columnar { get; set; }
[DataMember(Name = "wait_for_completion_timeout")]
Time WaitForCompletionTimeout { get; set; }
}

public partial class QuerySqlRequest
{
/// <inheritdoc cref="IQuerySqlRequest.Cursor" />
/// >
public string Cursor { get; set; }

/// <inheritdoc cref="IQuerySqlRequest.Columnar" />
/// >
public bool? Columnar { get; set; }

/// <inheritdoc cref="IQuerySqlRequest.Cursor" />
public string Cursor { get; set; }

/// <inheritdoc cref="ISqlRequest.FetchSize" />
/// >
public int? FetchSize { get; set; }

/// <inheritdoc cref="ISqlRequest.Filter" />
/// >
public QueryContainer Filter { get; set; }

/// <inheritdoc cref="ISqlRequest.Query" />
/// >
public string Query { get; set; }

/// <inheritdoc cref="ISqlRequest.RuntimeFields" />
public IRuntimeFields RuntimeFields { get; set; }

/// <inheritdoc cref="ISqlRequest.TimeZone" />
/// >
public string TimeZone { get; set; }

/// <inheritdoc />
public IRuntimeFields RuntimeFields { get; set; }
/// <inheritdoc cref="IQuerySqlRequest.WaitForCompletionTimeout" />
public Time WaitForCompletionTimeout { get; set; }
}

public partial class QuerySqlDescriptor
{
string IQuerySqlRequest.Cursor { get; set; }
bool? IQuerySqlRequest.Columnar { get; set; }
string IQuerySqlRequest.Cursor { get; set; }
int? ISqlRequest.FetchSize { get; set; }
QueryContainer ISqlRequest.Filter { get; set; }
string ISqlRequest.Query { get; set; }
string ISqlRequest.TimeZone { get; set; }
IRuntimeFields ISqlRequest.RuntimeFields { get; set; }
string ISqlRequest.TimeZone { get; set; }
Time IQuerySqlRequest.WaitForCompletionTimeout { get; set; }

/// <inheritdoc cref="ISqlRequest.Query" />
/// >
Expand Down Expand Up @@ -98,5 +103,8 @@ public QuerySqlDescriptor Filter<T>(Func<QueryContainerDescriptor<T>, QueryConta
/// <inheritdoc cref="ISqlRequest.RuntimeFields" />
public QuerySqlDescriptor RuntimeFields(Func<RuntimeFieldsDescriptor, IPromise<IRuntimeFields>> runtimeFieldsSelector) =>
Assign(runtimeFieldsSelector, (a, v) => a.RuntimeFields = v?.Invoke(new RuntimeFieldsDescriptor())?.Value);

/// <inheritdoc cref="IQuerySqlRequest.WaitForCompletionTimeout" />
public QuerySqlDescriptor WaitForCompletionTimeout(Time frequency) => Assign(frequency, (a, v) => a.WaitForCompletionTimeout = v);
}
}
22 changes: 20 additions & 2 deletions src/Nest/XPack/Sql/QuerySql/QuerySqlResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,31 @@ public class QuerySqlResponse : ResponseBase
public string Cursor { get; internal set; }

/// <summary>
/// If <see cref="IQuerySqlRequest.Columnar"/> has been set to false, this property will contain the row values
/// Identifier for the search.
/// </summary>
[DataMember(Name = "id")]
public string Id { get; internal set; }

/// <summary>
/// If true, the response does not contain complete search results.
/// </summary>
[DataMember(Name = "is_partial")]
public bool IsPartial { get; internal set; }

/// <summary>
/// If true, the search request is still executing.
/// </summary>
[DataMember(Name = "is_running")]
public bool IsRunning { get; internal set; }

/// <summary>
/// If <see cref="IQuerySqlRequest.Columnar" /> has been set to false, this property will contain the row values
/// </summary>
[DataMember(Name = "rows")]
public IReadOnlyCollection<SqlRow> Rows { get; internal set; } = EmptyReadOnly<SqlRow>.Collection;

/// <summary>
/// If <see cref="IQuerySqlRequest.Columnar"/> has been set to true, this property will contain the column values
/// If <see cref="IQuerySqlRequest.Columnar" /> has been set to true, this property will contain the column values
/// </summary>
[DataMember(Name = "values")]
public IReadOnlyCollection<SqlRow> Values { get; internal set; } = EmptyReadOnly<SqlRow>.Collection;
Expand Down
2 changes: 1 addition & 1 deletion src/Nest/XPack/Sql/Status/SqlSearchStatusResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class SqlSearchStatusResponse : ResponseBase
/// For a completed search shows the http status code of the completed search.
/// </summary>
[DataMember(Name = "completion_status")]
public int CompletionStatus { get; internal set; }
public int? CompletionStatus { get; internal set; }

/// <summary>
/// For a running search shows a timestamp when the eql search started, in milliseconds since the Unix epoch.
Expand Down
153 changes: 153 additions & 0 deletions tests/Tests/XPack/Sql/SqlSearchApiCoordinatedTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.Threading.Tasks;
using Elastic.Elasticsearch.Xunit.XunitPlumbing;
using FluentAssertions;
using Nest;
using Tests.Core.Extensions;
using Tests.Core.ManagedElasticsearch.Clusters;
using Tests.Domain;
using Tests.Domain.Helpers;
using Tests.Framework.EndpointTests;
using Tests.Framework.EndpointTests.TestState;

namespace Tests.XPack.Sql
{
[SkipVersion("<7.14.0", "All endpoints GA in 7.14.0")]
public class SqlSearchApiCoordinatedTests : CoordinatedIntegrationTestBase<XPackCluster>
{
private const string DeleteStep = nameof(DeleteStep);
private const string GetStep = nameof(GetStep);
private const string StatusStep = nameof(StatusStep);
private const string SubmitStep = nameof(SubmitStep);
private const string WaitStep = nameof(WaitStep);

private static readonly string SqlQuery =
$@"SELECT type, name, startedOn, numberOfCommits
FROM {TestValueHelper.ProjectsIndex}
WHERE type = '{Project.TypeName}'
ORDER BY numberOfContributors DESC";

public SqlSearchApiCoordinatedTests(XPackCluster cluster, EndpointUsage usage) : base(new CoordinatedUsage(cluster, usage, testOnlyOne: true)
{
{
SubmitStep, u =>
u.Calls<QuerySqlDescriptor, QuerySqlRequest, IQuerySqlRequest, QuerySqlResponse>(
_ => new QuerySqlRequest { Query = SqlQuery, FetchSize = 5, WaitForCompletionTimeout = "0s" },
(_, d) => d
.Query(SqlQuery)
.FetchSize(5)
.WaitForCompletionTimeout("0s"),
(_, c, f) => c.Sql.Query(f),
(_, c, f) => c.Sql.QueryAsync(f),
(_, c, r) => c.Sql.Query(r),
(_, c, r) => c.Sql.QueryAsync(r),
(r, values) => values.ExtendedValue("id", r.Id)
)
},
{
StatusStep, u =>
u.Calls<SqlSearchStatusDescriptor, SqlSearchStatusRequest, ISqlSearchStatusRequest, SqlSearchStatusResponse>(
v => new SqlSearchStatusRequest(v),
(v, d) => d,
(v, c, f) => c.Sql.SearchStatus(v, f),
(v, c, f) => c.Sql.SearchStatusAsync(v, f),
(v, c, r) => c.Sql.SearchStatus(r),
(v, c, r) => c.Sql.SearchStatusAsync(r),
uniqueValueSelector: values => values.ExtendedValue<string>("id")
)
},
{
// allows the search to complete
WaitStep, u => u.Call(async (_, c) =>
{
// wait for the search to complete
var complete = false;
var count = 0;

while (!complete && count++ < 10)
{
await Task.Delay(100);
var status = await c.Sql.SearchStatusAsync(u.Usage.CallUniqueValues.ExtendedValue<string>("id"));
complete = !status.IsRunning && status.CompletionStatus.HasValue;
}
})
},
{
GetStep, u =>
u.Calls<SqlGetDescriptor, SqlGetRequest, ISqlGetRequest, SqlGetResponse>(
v => new SqlGetRequest(v),
(_, d) => d,
(v, c, f) => c.Sql.Get(v, f),
(v, c, f) => c.Sql.GetAsync(v, f),
(_, c, r) => c.Sql.Get(r),
(_, c, r) => c.Sql.GetAsync(r),
uniqueValueSelector: values => values.ExtendedValue<string>("id")
)
},
{
DeleteStep, u =>
u.Calls<SqlDeleteDescriptor, SqlDeleteRequest, ISqlDeleteRequest, SqlDeleteResponse>(
v => new SqlDeleteRequest(v),
(_, d) => d,
(v, c, f) => c.Sql.Delete(v, f),
(v, c, f) => c.Sql.DeleteAsync(v, f),
(_, c, r) => c.Sql.Delete(r),
(_, c, r) => c.Sql.DeleteAsync(r),
uniqueValueSelector: values => values.ExtendedValue<string>("id")
)
}
}) { }

[I] public async Task SqlSearchResponse() => await Assert<QuerySqlResponse>(SubmitStep, r =>
{
r.ShouldBeValid();
r.Id.Should().NotBeNullOrEmpty();
r.IsPartial.Should().BeTrue();
r.IsRunning.Should().BeTrue();
});

[I] public async Task SqlSearchStatusResponse() => await Assert<SqlSearchStatusResponse>(StatusStep, r =>
{
r.ShouldBeValid();
r.Id.Should().NotBeNullOrEmpty();
r.IsPartial.Should().BeTrue();
r.IsRunning.Should().BeTrue();
r.ExpirationTimeInMillis.Should().BeGreaterThan(0);
r.StartTimeInMillis.Should().BeGreaterThan(0);
});

[I] public async Task SqlGetResponse() => await Assert<SqlGetResponse>(GetStep, r =>
{
r.ShouldBeValid();
r.IsPartial.Should().BeFalse();
r.IsRunning.Should().BeFalse();

r.Cursor.Should().NotBeNullOrWhiteSpace("response cursor");
r.Rows.Should().NotBeNullOrEmpty();
r.Columns.Should().NotBeNullOrEmpty().And.HaveCount(4);
foreach (var c in r.Columns)
{
c.Name.Should().NotBeNullOrWhiteSpace("column name");
c.Type.Should().NotBeNullOrWhiteSpace("column type");
}
foreach (var row in r.Rows)
{
row.Should().NotBeNull().And.HaveCount(4);
var type = row[0].As<string>().Should().NotBeNullOrWhiteSpace("a type returned null");
var name = row[1].As<string>().Should().NotBeNullOrWhiteSpace("a name returned null");
var date = row[2].As<DateTime>().Should().BeAfter(default);
var numberOfCommits = row[3].As<int?>();
}
});

[I] public async Task SqlDeleteResponse() => await Assert<SqlDeleteResponse>(DeleteStep, r =>
{
r.ShouldBeValid();
r.Acknowledged.Should().BeTrue();
});
}
}