Skip to content

Commit

Permalink
Implement LimitToLast
Browse files Browse the repository at this point in the history
  • Loading branch information
jskeet committed Mar 31, 2020
1 parent ac186c8 commit 2f25d1c
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 19 deletions.
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
Expand Down Expand Up @@ -65,5 +66,27 @@ async Task ExpectChangeAsync(int? expected)
Assert.Equal(expected, latestValue);
}
}

[Fact]
public async Task LimitToLast()
{
var collection = _fixture.FirestoreDb.Collection(_fixture.CollectionPrefix + "-WatchLimitToLast");
await collection.Document("doc1").CreateAsync(new { counter = 1 });
await collection.Document("doc2").CreateAsync(new { counter = 2 });
await collection.Document("doc3").CreateAsync(new { counter = 3 });
var query = collection.OrderBy("counter").LimitToLast(2);
var semaphore = new SemaphoreSlim(0);
QuerySnapshot receivedSnapshot = null;
var listener = query.Listen(snapshot =>
{
receivedSnapshot = snapshot;
semaphore.Release();
});
// Wait up to 5 seconds for the query to work.
Assert.True(await semaphore.WaitAsync(5000));
await listener.StopAsync();
var ids = receivedSnapshot.Documents.Select(doc => doc.Id).ToList();
Assert.Equal(new[] { "doc2", "doc3" }, ids);
}
}
}
Expand Up @@ -123,6 +123,22 @@ public async Task Limit()
Assert.Equal(HighScore.Data.OrderBy(x => x.Level).Take(3), items);
}

[Fact]
public async Task LimitToLast_GetSnapshotAsync()
{
var query = _fixture.HighScoreCollection.OrderBy("Level").LimitToLast(3);
var snapshot = await query.GetSnapshotAsync();
var items = snapshot.Documents.Select(doc => doc.ConvertTo<HighScore>()).ToList();
Assert.Equal(HighScore.Data.OrderByDescending(x => x.Level).Take(3).Reverse(), items);
}

[Fact]
public void LimitToLast_StreamingThrows()
{
var query = _fixture.HighScoreCollection.OrderBy("Level").LimitToLast(3);
Assert.Throws<InvalidOperationException>(() => query.StreamAsync());
}

[Fact]
public async Task Offset()
{
Expand Down
Expand Up @@ -752,7 +752,7 @@ public async Task StreamAsync_WithDocuments()
var db = FirestoreDb.Create("proj", "db", mock.Object);
var query = db.Collection("col").Select("Name").Offset(3);
// Just for variety, we'll provide a transaction ID this time...
var documents = await query.StreamAsync(ByteString.CopyFrom(1, 2, 3, 4), CancellationToken.None).ToListAsync();
var documents = await query.StreamAsync(ByteString.CopyFrom(1, 2, 3, 4), CancellationToken.None, allowLimitToLast: false).ToListAsync();
Assert.Equal(2, documents.Count);

var doc1 = documents[0];
Expand Down Expand Up @@ -1049,6 +1049,35 @@ public void CollectionGroup_CursorForPath()
Assert.Equal(expected, query.ToStructuredQuery());
}

// Note: the LimitToLast tests are mostly written in terms of "Query X is equivalent to Query Y";
// the feature is intended to be a convenience to avoid users writing more complicated queries.
[Fact]
public void LimitToLast_ReversesOrderingConstraints()
{
var query = s_db.Collection("col").OrderBy("foo").LimitToLast(42);
var expectedProto = s_db.Collection("col").OrderByDescending("foo").Limit(42).ToStructuredQuery();
var actualProto = query.ToStructuredQuery();
Assert.Equal(expectedProto, actualProto);
}

[Fact]
public void LimitToLast_ReversesCursors()
{
var query = s_db.Collection("col").OrderBy("foo").StartAt("start").EndBefore("end").LimitToLast(42);
var expectedProto = s_db.Collection("col").OrderByDescending("foo").StartAfter("end").EndAt("start").Limit(42).ToStructuredQuery();
var actualProto = query.ToStructuredQuery();
Assert.Equal(expectedProto, actualProto);
}

[Fact]
public void LimitToLast_RequiresAtLeastOneOrderingConstraint()
{
var query = s_db.Collection("col").LimitToLast(42);
Assert.Throws<InvalidOperationException>(() => query.ToStructuredQuery());
}

// Result reversal and StreamAsync being rejected are handled in integration tests.

private static FieldReference Field(string path) => new FieldReference { FieldPath = path };
private static Filter Filter(UnaryFilter filter) => new Filter { UnaryFilter = filter };
private static Filter Filter(FieldFilter filter) => new Filter { FieldFilter = filter };
Expand Down
93 changes: 75 additions & 18 deletions apis/Google.Cloud.Firestore/Google.Cloud.Firestore/Query.cs
Expand Up @@ -41,14 +41,16 @@ public class Query : IEquatable<Query>
// multiple Query objects may share the same internal references.
// Any additional fields should be included in equality/hash code checks.
private readonly int _offset;
private readonly int? _limit;
private readonly (int count, LimitType type)? _limit;
private readonly IReadOnlyList<InternalOrdering> _orderings; // Never null
private readonly IReadOnlyList<InternalFilter> _filters; // May be null
private readonly IReadOnlyList<FieldPath> _projections; // May be null
private readonly Cursor _startAt;
private readonly Cursor _endAt;
private readonly QueryRoot _root;

private bool IsLimitToLast => _limit?.type == LimitType.Last;

/// <summary>
/// The database this query will search over.
/// </summary>
Expand All @@ -73,7 +75,7 @@ private protected Query(FirestoreDb database, DocumentReference parent, string c
// no further cloning: it is the responsibility of each method to ensure it creates a clone for any new data.
private Query(
QueryRoot root,
int offset, int? limit,
int offset, (int count, LimitType type)? limit,
IReadOnlyList<InternalOrdering> orderings, IReadOnlyList<InternalFilter> filters, IReadOnlyList<FieldPath> projections,
Cursor startAt, Cursor endAt)
{
Expand All @@ -90,21 +92,32 @@ private protected Query(FirestoreDb database, DocumentReference parent, string c
internal static Query ForCollectionGroup(FirestoreDb database, string collectionId) =>
new Query(QueryRoot.ForCollectionGroup(database, collectionId));

internal StructuredQuery ToStructuredQuery() =>
new StructuredQuery
internal StructuredQuery ToStructuredQuery()
{
bool limitToLast = IsLimitToLast;
if (limitToLast && !_orderings.Any())
{
throw new InvalidOperationException($"Queries using {nameof(LimitToLast)} must specify at least one ordering.");
}

return new StructuredQuery
{
From = { new CollectionSelector { AllDescendants = _root.AllDescendants, CollectionId = _root.CollectionId } },
Limit = _limit,
Limit = _limit?.count,
Offset = _offset,
OrderBy = { _orderings.Select(o => o.ToProto()) },
EndAt = _endAt,
OrderBy = { _orderings.Select(o => o.ToProto(invertDirection: limitToLast)) },
EndAt = limitToLast ? InvertCursor(_startAt) :_endAt,
Select = _projections == null ? null : new Projection { Fields = { _projections.Select(fp => fp.ToFieldReference()) } },
StartAt = _startAt,
StartAt = limitToLast ? InvertCursor(_endAt) : _startAt,
Where = _filters == null ? null
: _filters.Count == 1 ? _filters[0].ToProto()
: new Filter { CompositeFilter = new CompositeFilter { Op = CompositeFilter.Types.Operator.And, Filters = { _filters.Select(f => f.ToProto()) } } }
};

Cursor InvertCursor(Cursor cursor) =>
cursor == null ? null : new Cursor { Before = !cursor.Before, Values = { cursor.Values } };
}

/// <summary>
/// Specifies the field paths to return in the results.
/// </summary>
Expand Down Expand Up @@ -487,9 +500,30 @@ private Query OrderBy(FieldPath fieldPath, Direction direction)
public Query Limit(int limit)
{
GaxPreconditions.CheckArgumentRange(limit, nameof(limit), 0, int.MaxValue);
return new Query(_root, _offset, limit, _orderings, _filters, _projections, _startAt, _endAt);
return new Query(_root, _offset, (limit, LimitType.First), _orderings, _filters, _projections, _startAt, _endAt);
}

/// <summary>
/// Creates and returns a new query that only returns the last <paramref name="limit"/> matching documents.
/// </summary>
/// <remarks>
/// <para>
/// You must specify at least one <see cref="OrderBy(string)"/> clause for limit-to-last queries. Otherwise,
/// an <see cref="InvalidOperationException"/> is thrown during execution.
/// </para>
/// <para>
/// Results for limit-to-last queries are only available once all documents are received, which means
/// that these queries cannot be streamed using the <see cref="StreamAsync(CancellationToken)"/> method.
/// </para>
/// </remarks>
/// <param name="limit">The maximum number of results to return. Must be greater than or equal to 0.</param>
/// <returns>A new query based on the current one, but with the specified limit applied.</returns>
public Query LimitToLast(int limit)
{
GaxPreconditions.CheckArgumentRange(limit, nameof(limit), 0, int.MaxValue);
return new Query(_root, _offset, (limit, LimitType.Last), _orderings, _filters, _projections, _startAt, _endAt);
}

/// <summary>
/// Specifies a number of results to skip.
/// </summary>
Expand Down Expand Up @@ -601,7 +635,7 @@ public Query Offset(int offset)

internal async Task<QuerySnapshot> GetSnapshotAsync(ByteString transactionId, CancellationToken cancellationToken)
{
var responses = StreamResponsesAsync(transactionId, cancellationToken);
var responses = StreamResponsesAsync(transactionId, cancellationToken, allowLimitToLast: true);
Timestamp? readTime = null;
List<DocumentSnapshot> snapshots = new List<DocumentSnapshot>();
await responses.ForEachAsync(response =>
Expand All @@ -617,7 +651,12 @@ internal async Task<QuerySnapshot> GetSnapshotAsync(ByteString transactionId, Ca
}, cancellationToken).ConfigureAwait(false);

GaxPreconditions.CheckState(readTime != null, "The stream returned from RunQuery did not provide a read timestamp.");

if (IsLimitToLast)
{
// Reverse in-place. We *could* create an IReadOnlyList<T> which acted as a "reversing view"
// but that seems like unnecessary work for now.
snapshots.Reverse();
}
return QuerySnapshot.ForDocuments(this, snapshots.AsReadOnly(), readTime.Value);
}

Expand All @@ -641,15 +680,19 @@ internal async Task<QuerySnapshot> GetSnapshotAsync(ByteString transactionId, Ca
/// </param>
/// <returns>An asynchronous sequence of document snapshots matching the query.</returns>
public IAsyncEnumerable<DocumentSnapshot> StreamAsync(CancellationToken cancellationToken = default) =>
StreamAsync(transactionId: null, cancellationToken);
StreamAsync(transactionId: null, cancellationToken, false);

internal IAsyncEnumerable<DocumentSnapshot> StreamAsync(ByteString transactionId, CancellationToken cancellationToken) =>
StreamResponsesAsync(transactionId, cancellationToken)
internal IAsyncEnumerable<DocumentSnapshot> StreamAsync(ByteString transactionId, CancellationToken cancellationToken, bool allowLimitToLast) =>
StreamResponsesAsync(transactionId, cancellationToken, allowLimitToLast)
.Where(resp => resp.Document != null)
.Select(resp => DocumentSnapshot.ForDocument(Database, resp.Document, Timestamp.FromProto(resp.ReadTime)));

private IAsyncEnumerable<RunQueryResponse> StreamResponsesAsync(ByteString transactionId, CancellationToken cancellationToken)
private IAsyncEnumerable<RunQueryResponse> StreamResponsesAsync(ByteString transactionId, CancellationToken cancellationToken, bool allowLimitToLast)
{
if (IsLimitToLast && !allowLimitToLast)
{
throw new InvalidOperationException($"Cannot stream responses for query using {nameof(LimitToLast)}");
}
var request = new RunQueryRequest { StructuredQuery = ToStructuredQuery(), Parent = ParentPath };
if (transactionId != null)
{
Expand Down Expand Up @@ -838,7 +881,7 @@ public bool Equals(Query other)
public override int GetHashCode() => GaxEqualityHelpers.CombineHashCodes(
_root.GetHashCode(),
_offset,
_limit ?? -1,
_limit?.GetHashCode() ?? -1,
GaxEqualityHelpers.GetListHashCode(_orderings),
GaxEqualityHelpers.GetListHashCode(_filters),
GaxEqualityHelpers.GetListHashCode(_projections),
Expand Down Expand Up @@ -905,7 +948,15 @@ private struct InternalOrdering : IEquatable<InternalOrdering>
{
internal FieldPath Field { get; }
internal Direction Direction { get; }
internal Order ToProto() => new Order { Direction = Direction, Field = Field.ToFieldReference() };
internal Direction InverseDirection =>
Direction switch
{
Direction.Ascending => Direction.Descending,
Direction.Descending => Direction.Ascending,
_ => throw new InvalidOperationException($"Can't invert direction {Direction}")
};

internal Order ToProto(bool invertDirection) => new Order { Direction = invertDirection ? InverseDirection : Direction, Field = Field.ToFieldReference() };

public override int GetHashCode() => GaxEqualityHelpers.CombineHashCodes(Field.GetHashCode(), (int) Direction);

Expand Down Expand Up @@ -1072,5 +1123,11 @@ private QueryRoot(FirestoreDb database, string parentPath, string collectionId,
public override int GetHashCode() =>
GaxEqualityHelpers.CombineHashCodes(Database.GetHashCode(), ParentPath.GetHashCode(), CollectionId?.GetHashCode() ?? 0, AllDescendants ? 1 : 0);
}

private enum LimitType
{
First,
Last
}
}
}

0 comments on commit 2f25d1c

Please sign in to comment.