Skip to content

Commit

Permalink
Pubsub client ordering-keys (#3099)
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisdunelm committed Jun 11, 2019
1 parent a46edf8 commit afa4a96
Show file tree
Hide file tree
Showing 8 changed files with 797 additions and 115 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.2</TargetFramework>
<IsPackable>False</IsPackable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Google.Cloud.PubSub.V1\Google.Cloud.PubSub.V1.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
using Grpc.Core;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Google.Cloud.PubSub.V1.OrderingKeyTester
{
/// <summary>
/// Tests the "ordering-keys" public client implementation.
/// Use the `ordering_key_generator` to generate the input CSV file, then run this program, then verify
/// the output CSV file using the `ordering_key_verifier`.
/// The input and output CSV files contain lines in the format:
/// "{ordering key, or empty string for no ordering key}","{message text}"
/// Before running this test the pubsub emulator must be started on localhost using ipv4 (not ipv6).
/// </summary>
class Program
{
private class InputLine
{
public InputLine(string line)
{
var parts = line.Split(',');
if (parts.Length != 2)
{
throw new Exception("Expected two parts in input file.");
}
OrderingKey = parts[0].Trim('"');
Message = parts[1].Trim('"');
}

public string OrderingKey { get; }
public string Message { get; }
}

static int Main(string[] args)
{
if (args.Length != 3)
{
Console.WriteLine("Call with three args: <emulator port> <input file> <output file>");
Console.WriteLine("This connects to host 127.0.0.1, so requires the emulator to be started using ipv4, not ipv6:");
Console.WriteLine(" E.g. cloud-pubsub-emulator.bat --host=127.0.0.1 --port=8700");
Console.WriteLine("It reads and writes CSV files as specified in the 'Testing Ordering Keys' section of the");
Console.WriteLine(" 'Pub/Sub Ordering Key Client Libraries' doc.");
Console.WriteLine();
return 1;
}

// Read inputs.
var port = int.Parse(args[0]);
var inputLines = File.ReadAllLines(args[1]).Select(line => new InputLine(line)).ToList();
// Setup gRPC channel to pubsub emulator.
var channel = new Channel("127.0.0.1", port, ChannelCredentials.Insecure);

// Create topic and subscription names.
var topicName = new TopicName("project", $"topic-{Guid.NewGuid()}");
var subscriptionName = new SubscriptionName("project", $"subscription-{Guid.NewGuid()}");
// List that records all received messages.
var recvMsgs = new List<PubsubMessage>();

// Run test.
CreateTopicAndSubscription();
Task subTask = Subscribe();
IEnumerable<Task> pubTasks = Publish();

// Wait for publish and subscribe tasks to complete.
Console.WriteLine("Waiting for all publish tasks to complete");
Task.WaitAll(pubTasks.ToArray());
Console.WriteLine("All publish tasks completed");
Console.WriteLine("Waiting for subscribe task to complete");
subTask.Wait();
Console.WriteLine("Subscribe task completed");

// Output ordered CSV file of recevied messages, for the validator.
var csvLines = recvMsgs.Select(x => $"\"{x.OrderingKey}\",\"{x.Data.ToStringUtf8()}\"").ToList();
File.WriteAllLines(args[2], csvLines);
Console.WriteLine("Output file written; all done :)");

return 0;

void CreateTopicAndSubscription()
{
Console.WriteLine("Creating topic and subscription");
var pubApi = PublisherServiceApiClient.Create(channel);
var topic = pubApi.CreateTopic(topicName);
var subApi = SubscriberServiceApiClient.Create(channel);
subApi.CreateSubscription(new Subscription
{
EnableMessageOrdering = true,
TopicAsTopicNameOneof = TopicNameOneof.From(topicName),
SubscriptionName = subscriptionName,
AckDeadlineSeconds = 120,
});
}

Task Subscribe()
{
Console.WriteLine("Creating subscribers");
var subs = new[]
{
SubscriberServiceApiClient.Create(channel),
SubscriberServiceApiClient.Create(channel),
SubscriberServiceApiClient.Create(channel)
};
var sub = new SubscriberClientImpl(subscriptionName, subs, new SubscriberClient.Settings(), null);
var recvCount = 0;
var rnd = new Random();
Console.WriteLine("Starting subscriber callback");
return sub.StartAsync(async (msg, ct) =>
{
lock (recvMsgs)
{
recvMsgs.Add(msg.Clone());
recvCount += 1;
if (recvCount == inputLines.Count)
{
Console.WriteLine("Received all messages, shutting down");
var dummyTask = sub.StopAsync(CancellationToken.None);
}
}
if (rnd.Next(3) == 0)
{
await Task.Delay(rnd.Next(3));
}
return SubscriberClient.Reply.Ack;
});
}

IEnumerable<Task> Publish()
{
Console.WriteLine("Creating publishers");
var pubs = new[]
{
PublisherServiceApiClient.Create(channel),
PublisherServiceApiClient.Create(channel),
PublisherServiceApiClient.Create(channel)
};
var pub = new PublisherClientImpl(topicName, pubs, new PublisherClient.Settings { EnableMessageOrdering = true }, null);
var publishTasks = new List<Task>();
Console.WriteLine("Starting to publish");
foreach (var inputLine in inputLines)
{
var pubTask = pub.PublishAsync(inputLine.OrderingKey, inputLine.Message);
publishTasks.Add(pubTask);
}
Console.WriteLine("Publishing complete");
return publishTasks;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
using Google.Api.Gax.Grpc;
using Google.Cloud.PubSub.V1.Tasks;
using Google.Cloud.PubSub.V1.Tests.Tasks;
using Google.Protobuf;
using Grpc.Core;
using System;
using System.Collections.Generic;
using System.Linq;
Expand All @@ -30,11 +30,15 @@ public class PublisherClientTest
{
private class FakePublisherServiceApiClient : PublisherServiceApiClient
{
public FakePublisherServiceApiClient(IScheduler scheduler, TaskHelper taskHelper, params TimeSpan[] delays)
public FakePublisherServiceApiClient(IScheduler scheduler, TaskHelper taskHelper,
TimeSpan[] delays = null, string orderingKeyErrorUnrecoverable = null, string orderingKeyErrorRecoverable = null)
{
_schduler = scheduler;
_taskHelper = taskHelper;
_delays = Enumerable.Repeat(delays.DefaultIfEmpty(), int.MaxValue).SelectMany(x => x).GetEnumerator();
_delays = Enumerable.Repeat((delays ?? Enumerable.Empty<TimeSpan>()).DefaultIfEmpty(), int.MaxValue)
.SelectMany(x => x).GetEnumerator();
_orderingKeyErrorUnrecoverable = orderingKeyErrorUnrecoverable;
_orderingKeyErrorRecoverable = orderingKeyErrorRecoverable;
_handledIds = new List<string>();
}

Expand All @@ -44,6 +48,9 @@ public FakePublisherServiceApiClient(IScheduler scheduler, TaskHelper taskHelper
private readonly IEnumerator<TimeSpan> _delays;
private readonly List<string> _handledIds;

private string _orderingKeyErrorUnrecoverable;
private string _orderingKeyErrorRecoverable;

public IReadOnlyList<string> HandledMessages
{
get
Expand All @@ -59,21 +66,40 @@ public override async Task<PublishResponse> PublishAsync(PublishRequest request,
{
_delays.MoveNext();
await _taskHelper.ConfigureAwait(_schduler.Delay(_delays.Current, callSettings?.CancellationToken ?? CancellationToken.None));
var byOrderingKey = request.Messages.GroupBy(x => x.OrderingKey).ToList();
if (byOrderingKey.Count > 1)
{
throw new InvalidOperationException("Multiple ordering-keys should not be present within a single batch.");
}
var msgIds = request.Messages.Select(x => x.Data.ToStringUtf8());
lock (_lock)
{
if (byOrderingKey.Count > 0 && byOrderingKey[0].Key == _orderingKeyErrorUnrecoverable)
{
// Cause a one-off unrecoverable error.
_orderingKeyErrorUnrecoverable = null;
throw new RpcException(new Status(StatusCode.DataLoss, "Data loss"));
}
if (byOrderingKey.Count > 0 && byOrderingKey[0].Key == _orderingKeyErrorRecoverable)
{
// Cause a one-off recoverable error.
_orderingKeyErrorRecoverable = null;
throw new RpcException(new Status(StatusCode.Unavailable, "Unavailable"));
}
_handledIds.AddRange(msgIds);
}
return new PublishResponse { MessageIds = { msgIds } };
}
}

private PublisherClient.Settings MakeSettings(IScheduler scheduler, int batchElementCountThreshold = 1, int batchRequestByteThreshold = 1)
private PublisherClient.Settings MakeSettings(IScheduler scheduler,
int batchElementCountThreshold = 1, int batchRequestByteThreshold = 1, bool enableMessageOrdering = false)
{
return new PublisherClient.Settings
{
Scheduler = scheduler,
BatchingSettings = new BatchingSettings(batchElementCountThreshold, batchRequestByteThreshold, TimeSpan.FromSeconds(10)),
EnableMessageOrdering = enableMessageOrdering
};

}
Expand Down Expand Up @@ -143,7 +169,7 @@ private PublisherClient.Settings MakeSettings(IScheduler scheduler, int batchEle
var topicName = new TopicName("FakeProject", "FakeTopic");
var scheduler = new TestScheduler();
TaskHelper taskHelper = scheduler.TaskHelper;
var client = new FakePublisherServiceApiClient(scheduler, taskHelper, TimeSpan.FromSeconds(1));
var client = new FakePublisherServiceApiClient(scheduler, taskHelper, delays: new[] { TimeSpan.FromSeconds(1) });
var settings = MakeSettings(scheduler, batchElementCountThreshold: 2, batchRequestByteThreshold: 1000);
int shutdownCount = 0;
var pub = new PublisherClientImpl(topicName, new[] { client }, settings, () =>
Expand Down Expand Up @@ -182,5 +208,116 @@ public void SettingsValidation()
new PublisherClient.Settings { BatchingSettings = new BatchingSettings(null, 1, null) }.Validate();
new PublisherClient.Settings { BatchingSettings = new BatchingSettings(null, PublisherClient.ApiMaxBatchingSettings.ByteCountThreshold, null) }.Validate();
}

[Fact]
public void PublishingMessageWithOrderingKeyRequiresOrderingEnabled()
{
var topicName = new TopicName("FakeProject", "FakeTopic");
var scheduler = new TestScheduler();
TaskHelper taskHelper = scheduler.TaskHelper;
var client = new FakePublisherServiceApiClient(scheduler, taskHelper);
var settings = MakeSettings(scheduler);
int shutdownCount = 0;
var pub = new PublisherClientImpl(topicName, new[] { client }, settings, () =>
{
Interlocked.Increment(ref shutdownCount);
return Task.FromResult(0);
}, taskHelper);
scheduler.Run(async () =>
{
await taskHelper.ConfigureAwait(
Assert.ThrowsAsync<InvalidOperationException>(() => pub.PublishAsync("an ordering key", "1")));
});
}

[Theory, PairwiseData]
public void OrderingKeyManyMessages(
[CombinatorialValues(1, 2, 5, 7)] int clientCount,
[CombinatorialValues(1, 2, 6, 13)] int threadCount,
[CombinatorialValues(101, 2000, 9999)] int messageCount,
[CombinatorialValues(1, 2, 9, 51)] int orderingKeysCount,
[CombinatorialValues(1, 5, 50)] int batchElementCountThreshold,
[CombinatorialValues(0, 1, 59, 123, 1001)] int delayMs1,
[CombinatorialValues(0, 2, 500)] int delayMs2)
{
var topicName = new TopicName("FakeProject", "FakeTopic");
var scheduler = new TestScheduler(threadCount);
TaskHelper taskHelper = scheduler.TaskHelper;
var clients = Enumerable.Range(0, clientCount)
.Select(_ => new FakePublisherServiceApiClient(scheduler, taskHelper,
new[] { TimeSpan.FromMilliseconds(delayMs1), TimeSpan.FromMilliseconds(delayMs2) })).ToArray();
var settings = MakeSettings(scheduler,
batchElementCountThreshold: batchElementCountThreshold, batchRequestByteThreshold: 10000, enableMessageOrdering: true);
int shutdownCount = 0;
var pub = new PublisherClientImpl(topicName, clients, settings, () =>
{
Interlocked.Increment(ref shutdownCount);
return Task.FromResult(0);
}, taskHelper);
scheduler.Run(async () =>
{
var tasks = Enumerable.Range(0, messageCount)
.Select(i => pub.PublishAsync((i % orderingKeysCount).ToString(), $"{i % orderingKeysCount}:{i}")).ToArray();
var ids = new HashSet<string>(await taskHelper.ConfigureAwait(taskHelper.WhenAll(tasks)));
await taskHelper.ConfigureAwait(pub.ShutdownAsync(new CancellationToken()));
Assert.Equal(messageCount, ids.Count);
// This doesn't check the global ordering between clients, but that's OK here.
// The emulator-based integration test checks are more thorough.
foreach (var client in clients)
{
var kv = client.HandledMessages.Select(x => x.Split(':')).Select(x => (key: x[0], value: x[1]));
foreach (var values in kv.GroupBy(x => x.key, x => x.value))
{
var errorMsg = $"Ordering-key '{values.Key}' out of order";
foreach (var pair in values.Zip(values.Skip(1), (a, b) => (a, b)))
{
Assert.True(int.Parse(pair.a) < int.Parse(pair.b), errorMsg);
}
}
}
Assert.Equal(ids, new HashSet<string>(clients.SelectMany(x => x.HandledMessages)));
Assert.Equal(1, shutdownCount);
});
}

[Fact]
public void OrderingKeyResumePublish()
{
const string unrecoverableKey = "error-unrecoverable";
const string recoverableKey = "error-recoverable";
var topicName = new TopicName("FakeProject", "FakeTopic");
var scheduler = new TestScheduler();
TaskHelper taskHelper = scheduler.TaskHelper;
var client = new FakePublisherServiceApiClient(scheduler, taskHelper,
orderingKeyErrorUnrecoverable: unrecoverableKey, orderingKeyErrorRecoverable: recoverableKey);
var settings = MakeSettings(scheduler, enableMessageOrdering: true);
int shutdownCount = 0;
var pub = new PublisherClientImpl(topicName, new[] { client }, settings, () =>
{
Interlocked.Increment(ref shutdownCount);
return Task.FromResult(0);
}, taskHelper);
scheduler.Run(async () =>
{
// First call will trigger an unrecoverable error.
var ex = await taskHelper.ConfigureAwait(
Assert.ThrowsAsync<RpcException>(() => pub.PublishAsync(unrecoverableKey, "unrecoverable error")));
Assert.Equal(StatusCode.DataLoss, ex.StatusCode);
// Sending again will reject the message.
await taskHelper.ConfigureAwait(
Assert.ThrowsAsync<OrderingKeyInErrorStateException>(() => pub.PublishAsync(unrecoverableKey, "key in error state")));
// Other ordering-keys publish OK.
await taskHelper.ConfigureAwait(pub.PublishAsync("ok-key", "ok"));
// Including a recoverable error.
await taskHelper.ConfigureAwait(pub.PublishAsync(recoverableKey, "recoverable error"));
// Including a message without an ordering key.
await taskHelper.ConfigureAwait(pub.PublishAsync("key not ordered"));
// Resume publishing on the ordering key.
pub.ResumePublish(unrecoverableKey);
await taskHelper.ConfigureAwait(pub.PublishAsync(unrecoverableKey, "unrecoverable key resumed"));
var expected = new HashSet<string>(new[] { "ok", "key not ordered", "recoverable error", "unrecoverable key resumed" });
Assert.Equal(expected, new HashSet<string>(client.HandledMessages));
});
}
}
}
Loading

0 comments on commit afa4a96

Please sign in to comment.