Skip to content

Commit

Permalink
Simplify PageStreamer code
Browse files Browse the repository at this point in the history
Two main changes:
- PageStreamer now *assumes* modification of an existing request object (with documentation)
- The tests are simplified to avoid the request checking (which was a bit pointless) and simper current/next tokens
  • Loading branch information
jskeet committed Jan 18, 2016
1 parent 2623408 commit d6d4f39
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 70 deletions.
86 changes: 36 additions & 50 deletions Src/Support/GoogleApis.Tests/Apis/Requests/PageStreamerTest.cs
Expand Up @@ -30,39 +30,31 @@

namespace Google.Apis.Tests.Apis.Requests
{
/// <summary>Tests for <see cref="PageStreamer"/>.</summary>
/// <summary>Tests for <see cref="PageStreamer{TResource, TRequest, TResponse, TToken}" />.</summary>
[TestFixture]
public class PageStreamerTest
{
private static readonly PageStreamedResource simpleResource = new PageStreamedResource(
"resource 1",
new Page("x", 1, 2, 3),
new Page("y", 4, 5),
new Page(null, 6, 7));
"simple",
new Page(null, 1, 2, 3),
new Page("x", 4, 5),
new Page("y", 6, 7));

private static readonly PageStreamedResource resourceWithEmptyPages = new PageStreamedResource(
"resource 2",
new Page("a", 1, 2, 3),
"empty pages",
new Page(null, 1, 2, 3),
new Page("a"),
new Page("b"),
new Page("c"),
new Page("d", 4, 5),
new Page(null));

private static readonly PageStreamedResource requestCheckingResource = new PageStreamedResource(
"resource 3",
new Page("x", 1, 2, 3),
new Page(null, 4, 5))
{
RequestCheck = "foo"
};
new Page("c", 4, 5),
new Page("d"));

private static PageStreamedResource[] AllResources = { simpleResource, resourceWithEmptyPages, requestCheckingResource };
private static PageStreamedResource[] AllResources = { simpleResource, resourceWithEmptyPages };

[Test]
[TestCaseSource("AllResources")]
public void Fetch(PageStreamedResource resource)
{
var actual = PageStreamedResource.PageStreamer.Fetch(resource.NewRequest());
var actual = PageStreamedResource.PageStreamer.Fetch(new Request(resource));
Assert.AreEqual(resource.AllItems, actual);
}

Expand All @@ -71,7 +63,7 @@ public void Fetch(PageStreamedResource resource)
public async Task FetchAllAsync(PageStreamedResource resource)
{
var actual = await PageStreamedResource.PageStreamer.FetchAllAsync(
resource.NewRequest(),
new Request(resource),
CancellationToken.None);
Assert.AreEqual(resource.AllItems, actual);
}
Expand All @@ -89,7 +81,7 @@ public async Task Cancellation()
new Page(null, 6, 7));
resource.gateKeeper = gatekeeperSource.Task;

var task = PageStreamedResource.PageStreamer.FetchAllAsync(resource.NewRequest(), cts.Token);
var task = PageStreamedResource.PageStreamer.FetchAllAsync(new Request(resource), cts.Token);
// Delay the resource fetching until we've cancelled the token
cts.Cancel();
gatekeeperSource.SetResult(0);
Expand All @@ -111,7 +103,7 @@ public class PageStreamedResource
{
internal static readonly PageStreamer<int, Request, Page, string> PageStreamer =
new PageStreamer<int, Request, Page, string>(
(request, token) => request.WithToken(token),
(request, token) => request.Token = token,
page => page.NextPageToken,
page => page.Items);

Expand All @@ -120,40 +112,33 @@ public class PageStreamedResource
internal Task gateKeeper = TaskEx.FromResult(0);
internal List<Page> Pages { get; private set; }
internal string Name { get; set; }
internal string RequestCheck { get; set; }

internal PageStreamedResource(string name, params Page[] pages)
{
this.Name = name;
this.Pages = pages.ToList();
for (int i = 0; i < pages.Length - 1; i++)
{
Pages[i].NextPageToken = Pages[i + 1].CurrentPageToken;
}
}

public override string ToString()
{
return Name;
}

internal Request NewRequest()
{
return new Request(this) { Check = RequestCheck };
}

internal Page GetPage(string token, string check)
internal Page GetPage(string token)
{
Assert.AreEqual(RequestCheck, check);
int index = Pages.FindIndex(page => page.NextPageToken == token);
// A request for the first page will have the NextPageToken of the last page... so we should always
// find something.
Assert.AreNotEqual(-1, index);
index = (index + 1) % Pages.Count;
return Pages[index];
// This will fail if we provide a token that doesn't exist.
return Pages.Single(p => p.CurrentPageToken == token);
}

internal async Task<Page> GetPageAsync(string token, string check, CancellationToken cancellationToken)
internal async Task<Page> GetPageAsync(string token, CancellationToken cancellationToken)
{
await gateKeeper;
cancellationToken.ThrowIfCancellationRequested();
return GetPage(token, check);
return GetPage(token);
}

public IEnumerable<int> AllItems => Pages.SelectMany(page => page.Items);
Expand All @@ -167,8 +152,6 @@ internal async Task<Page> GetPageAsync(string token, string check, CancellationT
internal class Request : IClientServiceRequest<Page>
{
internal string Token { get; set; }
// Used to check that we keep the right request info
internal string Check { get; set; }

private readonly PageStreamedResource resource;

Expand All @@ -177,11 +160,6 @@ internal Request(PageStreamedResource resource)
this.resource = resource;
}

internal Request WithToken(string token)
{
return new Request(resource) { Token = token, Check = Check };
}

// Properties which are more easily implemented by returning dummy values than by throwing exceptions
public string MethodName { get { return null; } }
public string RestPath { get { return null; } }
Expand All @@ -196,12 +174,12 @@ public Task<Page> ExecuteAsync()

public Task<Page> ExecuteAsync(CancellationToken cancellationToken)
{
return resource.GetPageAsync(Token, Check, cancellationToken);
return resource.GetPageAsync(Token, cancellationToken);
}

public Page Execute()
{
return resource.GetPage(Token, Check);
return resource.GetPage(Token);
}

// Methods that we know we won't call. (PageStreamer only needs to know how to execute requests, not the low-level bits.)
Expand Down Expand Up @@ -229,11 +207,19 @@ public Stream ExecuteAsStream()
internal class Page
{
internal List<int> Items { get; }
internal string NextPageToken { get; }
/// <summary>
/// The page token used to request this page.
/// </summary>
internal string CurrentPageToken { get; }
/// <summary>
/// The page token included with this page in order to request the next page of results.
/// (This will be null if this is the last page of results.)
/// </summary>
internal string NextPageToken { get; set; }

internal Page(string token, params int[] items)
{
this.NextPageToken = token;
this.CurrentPageToken = token;
this.Items = items.ToList();
}
}
Expand Down
48 changes: 28 additions & 20 deletions Src/Support/GoogleApis/Apis/Requests/PageStreamer.cs
Expand Up @@ -45,26 +45,26 @@ public sealed class PageStreamer<TResource, TRequest, TResponse, TToken>
// Simple way of avoiding NullReferenceException if the response extractor returns null.
private static readonly TResource[] emptyResources = new TResource[0];

private readonly Func<TRequest, TToken, TRequest> requestProvider;
private readonly Action<TRequest, TToken> requestModifier;
private readonly Func<TResponse, TToken> tokenExtractor;
private readonly Func<TResponse, IEnumerable<TResource>> resourceExtractor;

/// <summary>
/// Creates a paginator for later use.
/// </summary>
/// <param name="requestProvider">Function to combine an initial request and a page token
/// into a new request. Must not be null.</param>
/// <param name="requestModifier">Action to modify a request to include the specified page token.
/// Must not be null.</param>
/// <param name="tokenExtractor">Function to extract the next page token from a response.
/// Must not be null.</param>
/// <param name="resourceExtractor">Function to extract a sequence of resources from a response.
/// Must not be null, although it can return null if it is passed a response which contains no
/// resources.</param>
public PageStreamer(
Func<TRequest, TToken, TRequest> requestProvider,
Action<TRequest, TToken> requestModifier,
Func<TResponse, TToken> tokenExtractor,
Func<TResponse, IEnumerable<TResource>> resourceExtractor)
{
if (requestProvider == null)
if (requestModifier == null)
{
throw new ArgumentNullException("requestProvider");
}
Expand All @@ -76,27 +76,31 @@ public sealed class PageStreamer<TResource, TRequest, TResponse, TToken>
{
throw new ArgumentNullException("resourceExtractor");
}
this.requestProvider = requestProvider;
this.requestModifier = requestModifier;
this.tokenExtractor = tokenExtractor;
this.resourceExtractor = resourceExtractor;
}

/// <summary>
/// Lazily fetches resources a page at a time.
/// </summary>
/// <param name="initialRequest">The initial request to send. If this contains a page token,
/// that token is maintained. This is also passed to the request provider along with a token
/// to provide another request. (The provider may build a new object, or modify the existing request.)</param>
/// <param name="request">The initial request to send. If this contains a page token,
/// that token is maintained. This will be modified with new page tokens over time, and should not
/// be changed by the caller. (The caller should clone the request if they want an independent object
/// to use in other calls or to modify.) Must not be null.</param>
/// <returns>A sequence of resources, which are fetched a page at a time. Must not be null.</returns>
public IEnumerable<TResource> Fetch(TRequest initialRequest)
public IEnumerable<TResource> Fetch(TRequest request)
{
if (request == null)
{
throw new ArgumentNullException("request");
}
TToken token;
TRequest nextRequest = initialRequest;
do
{
TResponse response = nextRequest.Execute();
TResponse response = request.Execute();
token = tokenExtractor(response);
nextRequest = requestProvider(initialRequest, token);
requestModifier(request, token);
foreach (var item in resourceExtractor(response) ?? emptyResources)
{
yield return item;
Expand All @@ -107,26 +111,30 @@ public IEnumerable<TResource> Fetch(TRequest initialRequest)
/// <summary>
/// Asynchronously (but eagerly) fetches a complete set of resources, potentially making multiple requests.
/// </summary>
/// <param name="initialRequest">The initial request to send. If this contains a page token,
/// that token is maintained. This is also passed to the request provider along with a token
/// to provide another request. (The provider may build a new object, or modify the existing request.)</param>
/// <param name="request">The initial request to send. If this contains a page token,
/// that token is maintained. This will be modified with new page tokens over time, and should not
/// be changed by the caller. (The caller should clone the request if they want an independent object
/// to use in other calls or to modify.)</param>
/// <returns>A sequence of resources, which are fetched asynchronously and a page at a time.</returns>
/// <param name="cancellationToken"></param>
/// <returns>A task whose result (when complete) is the complete set of results fetched starting with the given
/// request, and continuing to make further requests until a response has no "next page" token.</returns>
public async Task<IList<TResource>> FetchAllAsync(
TRequest initialRequest,
TRequest request,
CancellationToken cancellationToken)
{
if (request == null)
{
throw new ArgumentNullException("request");
}
var results = new List<TResource>();
TToken token;
TRequest nextRequest = initialRequest;
do
{
cancellationToken.ThrowIfCancellationRequested();
TResponse response = await nextRequest.ExecuteAsync(cancellationToken).ConfigureAwait(false);
TResponse response = await request.ExecuteAsync(cancellationToken).ConfigureAwait(false);
token = tokenExtractor(response);
nextRequest = requestProvider(initialRequest, token);
requestModifier(request, token);
results.AddRange(resourceExtractor(response) ?? emptyResources);
} while (token != null);
return results;
Expand Down

0 comments on commit d6d4f39

Please sign in to comment.