New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add PageStreamer to make it simple to fetch paged lists #654
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
227 changes: 227 additions & 0 deletions
227
Src/Support/GoogleApis.Tests/Apis/Requests/PageStreamerTest.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,227 @@ | ||
/* | ||
Copyright 2016 Google Inc | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
using System.IO; | ||
using System.Linq; | ||
using System.Net.Http; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
|
||
using NUnit.Framework; | ||
|
||
using Google.Apis.Requests; | ||
using Google.Apis.Discovery; | ||
using Google.Apis.Services; | ||
|
||
namespace Google.Apis.Tests.Apis.Requests | ||
{ | ||
/// <summary>Tests for <see cref="PageStreamer{TResource, TRequest, TResponse, TToken}" />.</summary> | ||
[TestFixture] | ||
public class PageStreamerTest | ||
{ | ||
private static readonly PageStreamedResource simpleResource = new PageStreamedResource( | ||
"simple", | ||
new Page(null, 1, 2, 3), | ||
new Page("x", 4, 5), | ||
new Page("y", 6, 7)); | ||
|
||
private static readonly PageStreamedResource resourceWithEmptyPages = new PageStreamedResource( | ||
"empty pages", | ||
new Page(null, 1, 2, 3), | ||
new Page("a"), | ||
new Page("b"), | ||
new Page("c", 4, 5), | ||
new Page("d")); | ||
|
||
private static PageStreamedResource[] AllResources = { simpleResource, resourceWithEmptyPages }; | ||
|
||
[Test] | ||
[TestCaseSource("AllResources")] | ||
public void Fetch(PageStreamedResource resource) | ||
{ | ||
var actual = PageStreamedResource.PageStreamer.Fetch(new Request(resource)); | ||
Assert.AreEqual(resource.AllItems, actual); | ||
} | ||
|
||
[Test] | ||
[TestCaseSource("AllResources")] | ||
public async Task FetchAllAsync(PageStreamedResource resource) | ||
{ | ||
var actual = await PageStreamedResource.PageStreamer.FetchAllAsync( | ||
new Request(resource), | ||
CancellationToken.None); | ||
Assert.AreEqual(resource.AllItems, actual); | ||
} | ||
|
||
// Note: this doesn't test all the possible timings of cancellation | ||
[Test] | ||
public async Task Cancellation() | ||
{ | ||
var cts = new CancellationTokenSource(); | ||
var gatekeeperSource = new TaskCompletionSource<int>(); | ||
var resource = new PageStreamedResource( | ||
"check", | ||
new Page("x", 1, 2, 3), | ||
new Page("y", 4, 5), | ||
new Page(null, 6, 7)); | ||
resource.gateKeeper = gatekeeperSource.Task; | ||
|
||
var task = PageStreamedResource.PageStreamer.FetchAllAsync(new Request(resource), cts.Token); | ||
// Delay the resource fetching until we've cancelled the token | ||
cts.Cancel(); | ||
gatekeeperSource.SetResult(0); | ||
// TODO: Move to Assert.ThrowsAsync when we update to an appropriate version of NUnit. | ||
// Assert.That is available, but the documentation is unclear how this interacts with await. | ||
try | ||
{ | ||
await task; | ||
Assert.Fail("Expected exception"); | ||
} | ||
catch (OperationCanceledException) | ||
{ | ||
// Expected | ||
} | ||
} | ||
|
||
// Has to be public so we can use it as a parameter for test cases | ||
public class PageStreamedResource | ||
{ | ||
internal static readonly PageStreamer<int, Request, Page, string> PageStreamer = | ||
new PageStreamer<int, Request, Page, string>( | ||
(request, token) => request.Token = token, | ||
page => page.NextPageToken, | ||
page => page.Items); | ||
|
||
// Task to await when fetching pages; defaults to an already completed task, | ||
// but can be set to another task to allow for more fine-grained interaction. | ||
internal Task gateKeeper = TaskEx.FromResult(0); | ||
internal List<Page> Pages { get; private set; } | ||
internal string Name { get; set; } | ||
|
||
internal PageStreamedResource(string name, params Page[] pages) | ||
{ | ||
this.Name = name; | ||
this.Pages = pages.ToList(); | ||
for (int i = 0; i < pages.Length - 1; i++) | ||
{ | ||
Pages[i].NextPageToken = Pages[i + 1].CurrentPageToken; | ||
} | ||
} | ||
|
||
public override string ToString() | ||
{ | ||
return Name; | ||
} | ||
|
||
internal Page GetPage(string token) | ||
{ | ||
// This will fail if we provide a token that doesn't exist. | ||
return Pages.Single(p => p.CurrentPageToken == token); | ||
} | ||
|
||
internal async Task<Page> GetPageAsync(string token, CancellationToken cancellationToken) | ||
{ | ||
await gateKeeper; | ||
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong. |
||
cancellationToken.ThrowIfCancellationRequested(); | ||
return GetPage(token); | ||
} | ||
|
||
public IEnumerable<int> AllItems => Pages.SelectMany(page => page.Items); | ||
} | ||
|
||
/// <summary> | ||
/// Request to work with <see cref="PageStreamedResource"/>; feels like overkill (in a non-HTTP context) | ||
/// due to having to implement <see cref="IClientServiceRequest{Page}"/> but there isn't an interface | ||
/// which only includes <c>Execute</c> and <c>ExecuteAsync.</c> | ||
/// </summary> | ||
internal class Request : IClientServiceRequest<Page> | ||
{ | ||
internal string Token { get; set; } | ||
|
||
private readonly PageStreamedResource resource; | ||
|
||
internal Request(PageStreamedResource resource) | ||
{ | ||
this.resource = resource; | ||
} | ||
|
||
// Properties which are more easily implemented by returning dummy values than by throwing exceptions | ||
public string MethodName { get { return null; } } | ||
public string RestPath { get { return null; } } | ||
public string HttpMethod { get { return null; } } | ||
public IDictionary<string, IParameter> RequestParameters { get { return new Dictionary<string, IParameter>(); } } | ||
public IClientService Service { get { return null; } } | ||
|
||
public Task<Page> ExecuteAsync() | ||
{ | ||
return ExecuteAsync(CancellationToken.None); | ||
} | ||
|
||
public Task<Page> ExecuteAsync(CancellationToken cancellationToken) | ||
{ | ||
return resource.GetPageAsync(Token, cancellationToken); | ||
} | ||
|
||
public Page Execute() | ||
{ | ||
return resource.GetPage(Token); | ||
} | ||
|
||
// Methods that we know we won't call. (PageStreamer only needs to know how to execute requests, not the low-level bits.) | ||
public HttpRequestMessage CreateRequest(bool? overrideGZipEnabled = default(bool?)) | ||
{ | ||
throw new NotImplementedException(); | ||
} | ||
|
||
public Task<Stream> ExecuteAsStreamAsync() | ||
{ | ||
throw new NotImplementedException(); | ||
} | ||
|
||
public Task<Stream> ExecuteAsStreamAsync(CancellationToken cancellationToken) | ||
{ | ||
throw new NotImplementedException(); | ||
} | ||
|
||
public Stream ExecuteAsStream() | ||
{ | ||
throw new NotImplementedException(); | ||
} | ||
} | ||
|
||
internal class Page | ||
{ | ||
internal List<int> Items { get; } | ||
/// <summary> | ||
/// The page token used to request this page. | ||
/// </summary> | ||
internal string CurrentPageToken { get; } | ||
/// <summary> | ||
/// The page token included with this page in order to request the next page of results. | ||
/// (This will be null if this is the last page of results.) | ||
/// </summary> | ||
internal string NextPageToken { get; set; } | ||
|
||
internal Page(string token, params int[] items) | ||
{ | ||
this.CurrentPageToken = token; | ||
this.Items = items.ToList(); | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
/* | ||
Copyright 2016 Google Inc | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
|
||
namespace Google.Apis.Requests | ||
{ | ||
// TODO(jskeet and LindaLawton): Add an example in the doc comment. | ||
|
||
/// <summary> | ||
/// A page streamer is a helper to provide both synchronous and asynchronous page streaming | ||
/// of a listable or queryable resource. | ||
/// </summary> | ||
/// <remarks> | ||
/// <para> | ||
/// The expected usage pattern is to create a single paginator for a resource collection, | ||
/// and then use the instance methods to obtain paginated results. | ||
/// </para> | ||
/// </remarks> | ||
/// <typeparam name="TResource">The type of resource being paginated</typeparam> | ||
/// <typeparam name="TRequest">The type of request used to fetch pages</typeparam> | ||
/// <typeparam name="TResponse">The type of response obtained when fetching pages</typeparam> | ||
/// <typeparam name="TToken">The type of the "next page token", which must be a reference type; | ||
/// a null reference for a token indicates the end of a stream of pages.</typeparam> | ||
public sealed class PageStreamer<TResource, TRequest, TResponse, TToken> | ||
where TToken : class | ||
where TRequest : IClientServiceRequest<TResponse> | ||
{ | ||
// Simple way of avoiding NullReferenceException if the response extractor returns null. | ||
private static readonly TResource[] emptyResources = new TResource[0]; | ||
|
||
private readonly Action<TRequest, TToken> requestModifier; | ||
private readonly Func<TResponse, TToken> tokenExtractor; | ||
private readonly Func<TResponse, IEnumerable<TResource>> resourceExtractor; | ||
|
||
/// <summary> | ||
/// Creates a paginator for later use. | ||
/// </summary> | ||
/// <param name="requestModifier">Action to modify a request to include the specified page token. | ||
/// Must not be null.</param> | ||
/// <param name="tokenExtractor">Function to extract the next page token from a response. | ||
/// Must not be null.</param> | ||
/// <param name="resourceExtractor">Function to extract a sequence of resources from a response. | ||
/// Must not be null, although it can return null if it is passed a response which contains no | ||
/// resources.</param> | ||
public PageStreamer( | ||
Action<TRequest, TToken> requestModifier, | ||
Func<TResponse, TToken> tokenExtractor, | ||
Func<TResponse, IEnumerable<TResource>> resourceExtractor) | ||
{ | ||
if (requestModifier == null) | ||
{ | ||
throw new ArgumentNullException("requestProvider"); | ||
} | ||
if (tokenExtractor == null) | ||
{ | ||
throw new ArgumentNullException("tokenExtractor"); | ||
} | ||
if (resourceExtractor == null) | ||
{ | ||
throw new ArgumentNullException("resourceExtractor"); | ||
} | ||
this.requestModifier = requestModifier; | ||
this.tokenExtractor = tokenExtractor; | ||
this.resourceExtractor = resourceExtractor; | ||
} | ||
|
||
/// <summary> | ||
/// Lazily fetches resources a page at a time. | ||
/// </summary> | ||
/// <param name="request">The initial request to send. If this contains a page token, | ||
/// that token is maintained. This will be modified with new page tokens over time, and should not | ||
/// be changed by the caller. (The caller should clone the request if they want an independent object | ||
/// to use in other calls or to modify.) Must not be null.</param> | ||
/// <returns>A sequence of resources, which are fetched a page at a time. Must not be null.</returns> | ||
public IEnumerable<TResource> Fetch(TRequest request) | ||
{ | ||
if (request == null) | ||
{ | ||
throw new ArgumentNullException("request"); | ||
} | ||
TToken token; | ||
do | ||
{ | ||
TResponse response = request.Execute(); | ||
token = tokenExtractor(response); | ||
requestModifier(request, token); | ||
foreach (var item in resourceExtractor(response) ?? emptyResources) | ||
{ | ||
yield return item; | ||
} | ||
} while (token != null); | ||
} | ||
|
||
/// <summary> | ||
/// Asynchronously (but eagerly) fetches a complete set of resources, potentially making multiple requests. | ||
/// </summary> | ||
/// <param name="request">The initial request to send. If this contains a page token, | ||
/// that token is maintained. This will be modified with new page tokens over time, and should not | ||
/// be changed by the caller. (The caller should clone the request if they want an independent object | ||
/// to use in other calls or to modify.) Must not be null.</param> | ||
/// <returns>A sequence of resources, which are fetched asynchronously and a page at a time.</returns> | ||
/// <param name="cancellationToken"></param> | ||
/// <returns>A task whose result (when complete) is the complete set of results fetched starting with the given | ||
/// request, and continuing to make further requests until a response has no "next page" token.</returns> | ||
public async Task<IList<TResource>> FetchAllAsync( | ||
TRequest request, | ||
CancellationToken cancellationToken) | ||
{ | ||
if (request == null) | ||
{ | ||
throw new ArgumentNullException("request"); | ||
} | ||
var results = new List<TResource>(); | ||
TToken token; | ||
do | ||
{ | ||
cancellationToken.ThrowIfCancellationRequested(); | ||
TResponse response = await request.ExecuteAsync(cancellationToken).ConfigureAwait(false); | ||
token = tokenExtractor(response); | ||
requestModifier(request, token); | ||
results.AddRange(resourceExtractor(response) ?? emptyResources); | ||
} while (token != null); | ||
return results; | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This comment was marked as spam.
Sorry, something went wrong.