Skip to content

Commit

Permalink
Add PageStreamer to make it simple to fetch paged lists
Browse files Browse the repository at this point in the history
Fixes issue #632, as far as we want to for now.

This will only make the PageStreamer available - it won't be used in generated code.
We will want to add examples of how to use this.

Note that the only async option for the moment is to retrieve a whole list asynchronously.
While it would be nice to do this more lazily, that would need something like Ix-Async, and we'd prefer not to
introduce a new dependency for this at the moment.
  • Loading branch information
jskeet committed Jan 19, 2016
1 parent 1dce119 commit fd5afa0
Show file tree
Hide file tree
Showing 4 changed files with 372 additions and 0 deletions.
227 changes: 227 additions & 0 deletions Src/Support/GoogleApis.Tests/Apis/Requests/PageStreamerTest.cs
@@ -0,0 +1,227 @@
/*
Copyright 2016 Google Inc
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;

using NUnit.Framework;

using Google.Apis.Requests;
using Google.Apis.Discovery;
using Google.Apis.Services;

namespace Google.Apis.Tests.Apis.Requests
{
/// <summary>Tests for <see cref="PageStreamer{TResource, TRequest, TResponse, TToken}" />.</summary>
[TestFixture]
public class PageStreamerTest
{
private static readonly PageStreamedResource simpleResource = new PageStreamedResource(
"simple",
new Page(null, 1, 2, 3),
new Page("x", 4, 5),
new Page("y", 6, 7));

private static readonly PageStreamedResource resourceWithEmptyPages = new PageStreamedResource(
"empty pages",
new Page(null, 1, 2, 3),
new Page("a"),
new Page("b"),
new Page("c", 4, 5),
new Page("d"));

private static PageStreamedResource[] AllResources = { simpleResource, resourceWithEmptyPages };

[Test]
[TestCaseSource("AllResources")]
public void Fetch(PageStreamedResource resource)
{
var actual = PageStreamedResource.PageStreamer.Fetch(new Request(resource));
Assert.AreEqual(resource.AllItems, actual);
}

[Test]
[TestCaseSource("AllResources")]
public async Task FetchAllAsync(PageStreamedResource resource)
{
var actual = await PageStreamedResource.PageStreamer.FetchAllAsync(
new Request(resource),
CancellationToken.None);
Assert.AreEqual(resource.AllItems, actual);
}

// Note: this doesn't test all the possible timings of cancellation
[Test]
public async Task Cancellation()
{
var cts = new CancellationTokenSource();
var gatekeeperSource = new TaskCompletionSource<int>();
var resource = new PageStreamedResource(
"check",
new Page("x", 1, 2, 3),
new Page("y", 4, 5),
new Page(null, 6, 7));
resource.gateKeeper = gatekeeperSource.Task;

var task = PageStreamedResource.PageStreamer.FetchAllAsync(new Request(resource), cts.Token);
// Delay the resource fetching until we've cancelled the token
cts.Cancel();
gatekeeperSource.SetResult(0);
// TODO: Move to Assert.ThrowsAsync when we update to an appropriate version of NUnit.
// Assert.That is available, but the documentation is unclear how this interacts with await.
try
{
await task;
Assert.Fail("Expected exception");
}
catch (OperationCanceledException)
{
// Expected
}
}

// Has to be public so we can use it as a parameter for test cases
public class PageStreamedResource
{
internal static readonly PageStreamer<int, Request, Page, string> PageStreamer =
new PageStreamer<int, Request, Page, string>(
(request, token) => request.Token = token,
page => page.NextPageToken,
page => page.Items);

// Task to await when fetching pages; defaults to an already completed task,
// but can be set to another task to allow for more fine-grained interaction.
internal Task gateKeeper = TaskEx.FromResult(0);
internal List<Page> Pages { get; private set; }
internal string Name { get; set; }

internal PageStreamedResource(string name, params Page[] pages)
{
this.Name = name;
this.Pages = pages.ToList();
for (int i = 0; i < pages.Length - 1; i++)
{
Pages[i].NextPageToken = Pages[i + 1].CurrentPageToken;
}
}

public override string ToString()
{
return Name;
}

internal Page GetPage(string token)
{
// This will fail if we provide a token that doesn't exist.
return Pages.Single(p => p.CurrentPageToken == token);
}

internal async Task<Page> GetPageAsync(string token, CancellationToken cancellationToken)
{
await gateKeeper;
cancellationToken.ThrowIfCancellationRequested();
return GetPage(token);
}

public IEnumerable<int> AllItems => Pages.SelectMany(page => page.Items);
}

/// <summary>
/// Request to work with <see cref="PageStreamedResource"/>; feels like overkill (in a non-HTTP context)
/// due to having to implement <see cref="IClientServiceRequest{Page}"/> but there isn't an interface
/// which only includes <c>Execute</c> and <c>ExecuteAsync.</c>
/// </summary>
internal class Request : IClientServiceRequest<Page>
{
internal string Token { get; set; }

private readonly PageStreamedResource resource;

internal Request(PageStreamedResource resource)
{
this.resource = resource;
}

// Properties which are more easily implemented by returning dummy values than by throwing exceptions
public string MethodName { get { return null; } }
public string RestPath { get { return null; } }
public string HttpMethod { get { return null; } }
public IDictionary<string, IParameter> RequestParameters { get { return new Dictionary<string, IParameter>(); } }
public IClientService Service { get { return null; } }

public Task<Page> ExecuteAsync()
{
return ExecuteAsync(CancellationToken.None);
}

public Task<Page> ExecuteAsync(CancellationToken cancellationToken)
{
return resource.GetPageAsync(Token, cancellationToken);
}

public Page Execute()
{
return resource.GetPage(Token);
}

// Methods that we know we won't call. (PageStreamer only needs to know how to execute requests, not the low-level bits.)
public HttpRequestMessage CreateRequest(bool? overrideGZipEnabled = default(bool?))
{
throw new NotImplementedException();
}

public Task<Stream> ExecuteAsStreamAsync()
{
throw new NotImplementedException();
}

public Task<Stream> ExecuteAsStreamAsync(CancellationToken cancellationToken)
{
throw new NotImplementedException();
}

public Stream ExecuteAsStream()
{
throw new NotImplementedException();
}
}

internal class Page
{
internal List<int> Items { get; }
/// <summary>
/// The page token used to request this page.
/// </summary>
internal string CurrentPageToken { get; }
/// <summary>
/// The page token included with this page in order to request the next page of results.
/// (This will be null if this is the last page of results.)
/// </summary>
internal string NextPageToken { get; set; }

internal Page(string token, params int[] items)
{
this.CurrentPageToken = token;
this.Items = items.ToList();
}
}
}
}
1 change: 1 addition & 0 deletions Src/Support/GoogleApis.Tests/GoogleApis.Tests.csproj
Expand Up @@ -136,6 +136,7 @@
<Compile Include="Apis\Http\ConfigurableMessageHandlerTest.cs" />
<Compile Include="Apis\Http\MaxUrlLengthInterceptorTest.cs" />
<Compile Include="Apis\Requests\BatchRequestTest.cs" />
<Compile Include="Apis\Requests\PageStreamerTest.cs" />
<Compile Include="Apis\Requests\Parameters\ParameterCollectionTest.cs" />
<Compile Include="Apis\Requests\Parameters\ParameterValidatorTest.cs" />
<Compile Include="Apis\Requests\RequestBuilderTest.cs" />
Expand Down
143 changes: 143 additions & 0 deletions Src/Support/GoogleApis/Apis/Requests/PageStreamer.cs
@@ -0,0 +1,143 @@
/*
Copyright 2016 Google Inc
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Google.Apis.Requests
{
// TODO(jskeet and LindaLawton): Add an example in the doc comment.

/// <summary>
/// A page streamer is a helper to provide both synchronous and asynchronous page streaming
/// of a listable or queryable resource.
/// </summary>
/// <remarks>
/// <para>
/// The expected usage pattern is to create a single paginator for a resource collection,
/// and then use the instance methods to obtain paginated results.
/// </para>
/// </remarks>
/// <typeparam name="TResource">The type of resource being paginated</typeparam>
/// <typeparam name="TRequest">The type of request used to fetch pages</typeparam>
/// <typeparam name="TResponse">The type of response obtained when fetching pages</typeparam>
/// <typeparam name="TToken">The type of the "next page token", which must be a reference type;
/// a null reference for a token indicates the end of a stream of pages.</typeparam>
public sealed class PageStreamer<TResource, TRequest, TResponse, TToken>
where TToken : class
where TRequest : IClientServiceRequest<TResponse>
{
// Simple way of avoiding NullReferenceException if the response extractor returns null.
private static readonly TResource[] emptyResources = new TResource[0];

private readonly Action<TRequest, TToken> requestModifier;
private readonly Func<TResponse, TToken> tokenExtractor;
private readonly Func<TResponse, IEnumerable<TResource>> resourceExtractor;

/// <summary>
/// Creates a paginator for later use.
/// </summary>
/// <param name="requestModifier">Action to modify a request to include the specified page token.
/// Must not be null.</param>
/// <param name="tokenExtractor">Function to extract the next page token from a response.
/// Must not be null.</param>
/// <param name="resourceExtractor">Function to extract a sequence of resources from a response.
/// Must not be null, although it can return null if it is passed a response which contains no
/// resources.</param>
public PageStreamer(
Action<TRequest, TToken> requestModifier,
Func<TResponse, TToken> tokenExtractor,
Func<TResponse, IEnumerable<TResource>> resourceExtractor)
{
if (requestModifier == null)
{
throw new ArgumentNullException("requestProvider");
}
if (tokenExtractor == null)
{
throw new ArgumentNullException("tokenExtractor");
}
if (resourceExtractor == null)
{
throw new ArgumentNullException("resourceExtractor");
}
this.requestModifier = requestModifier;
this.tokenExtractor = tokenExtractor;
this.resourceExtractor = resourceExtractor;
}

/// <summary>
/// Lazily fetches resources a page at a time.
/// </summary>
/// <param name="request">The initial request to send. If this contains a page token,
/// that token is maintained. This will be modified with new page tokens over time, and should not
/// be changed by the caller. (The caller should clone the request if they want an independent object
/// to use in other calls or to modify.) Must not be null.</param>
/// <returns>A sequence of resources, which are fetched a page at a time. Must not be null.</returns>
public IEnumerable<TResource> Fetch(TRequest request)
{
if (request == null)
{
throw new ArgumentNullException("request");
}
TToken token;
do
{
TResponse response = request.Execute();
token = tokenExtractor(response);
requestModifier(request, token);
foreach (var item in resourceExtractor(response) ?? emptyResources)
{
yield return item;
}
} while (token != null);
}

/// <summary>
/// Asynchronously (but eagerly) fetches a complete set of resources, potentially making multiple requests.
/// </summary>
/// <param name="request">The initial request to send. If this contains a page token,
/// that token is maintained. This will be modified with new page tokens over time, and should not
/// be changed by the caller. (The caller should clone the request if they want an independent object
/// to use in other calls or to modify.) Must not be null.</param>
/// <returns>A sequence of resources, which are fetched asynchronously and a page at a time.</returns>
/// <param name="cancellationToken"></param>
/// <returns>A task whose result (when complete) is the complete set of results fetched starting with the given
/// request, and continuing to make further requests until a response has no "next page" token.</returns>
public async Task<IList<TResource>> FetchAllAsync(
TRequest request,
CancellationToken cancellationToken)
{
if (request == null)
{
throw new ArgumentNullException("request");
}
var results = new List<TResource>();
TToken token;
do
{
cancellationToken.ThrowIfCancellationRequested();
TResponse response = await request.ExecuteAsync(cancellationToken).ConfigureAwait(false);
token = tokenExtractor(response);
requestModifier(request, token);
results.AddRange(resourceExtractor(response) ?? emptyResources);
} while (token != null);
return results;
}
}
}
1 change: 1 addition & 0 deletions Src/Support/GoogleApis/GoogleApis_Shared.projitems
Expand Up @@ -15,6 +15,7 @@
<Compile Include="$(MSBuildThisFileDirectory)Apis\Requests\HttpRequestMessageExtenstions.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Apis\Requests\IClientServiceRequest.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Apis\Requests\IDirectResponseSchema.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Apis\Requests\PageStreamer.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Apis\Services\BaseClientService.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Apis\Services\IClientService.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Apis\[Media]\Download\IDownloadProgress.cs" />
Expand Down

0 comments on commit fd5afa0

Please sign in to comment.