Skip to content

Commit

Permalink
feat: Add PublisherClientBuilder
Browse files Browse the repository at this point in the history
This is now the preferred way of customizing creation of PublisherClient, instead of using PublisherClient.ClientCreationSettings.
  • Loading branch information
jskeet committed Jul 12, 2022
1 parent f667ae3 commit 09eeaa9
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 108 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017, Google Inc. All rights reserved.
// Copyright 2017, Google Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,13 +61,6 @@ static PubSubClientTest()
private static Task<SubscriberServiceApiClient> CreateSubscriberServiceApiClientAsync() =>
new SubscriberServiceApiClientBuilder { EmulatorDetection = EmulatorDetection.EmulatorOrProduction }.BuildAsync();

private static Task<PublisherClient> CreatePublisherClientAsync(TopicName topicName, PublisherClient.ClientCreationSettings clientCreationSettings = null, PublisherClient.Settings settings = null)
{
clientCreationSettings ??= new PublisherClient.ClientCreationSettings();
clientCreationSettings = clientCreationSettings.WithEmulatorDetection(EmulatorDetection.EmulatorOrProduction);
return PublisherClient.CreateAsync(topicName, clientCreationSettings, settings);
}

private static Task<SubscriberClient> CreateSubscriberClientAsync(SubscriptionName subscriptionName, SubscriberClient.ClientCreationSettings clientCreationSettings = null, SubscriberClient.Settings settings = null)
{
clientCreationSettings ??= new SubscriberClient.ClientCreationSettings();
Expand Down Expand Up @@ -124,21 +117,23 @@ private async Task CreateTopicAndSubscription(TopicName topicName, SubscriptionN
minMessageSize = Math.Max(4, minMessageSize);

// Create PublisherClient and SubscriberClient
var publisher = await CreatePublisherClientAsync(topicName,
clientCreationSettings: new PublisherClient.ClientCreationSettings(
clientCount: publisherChannelCount,
publisherServiceApiSettings: timeouts == null ? null : new PublisherServiceApiSettings
{
PublishSettings = CallSettings
var publisher = await new PublisherClientBuilder
{
TopicName = topicName,
ClientCount = publisherChannelCount,
EmulatorDetection = EmulatorDetection.EmulatorOrProduction,
ApiSettings = timeouts == null ? null : new PublisherServiceApiSettings
{
PublishSettings = CallSettings
.FromRetry(RetrySettings.FromExponentialBackoff(
maxAttempts: int.MaxValue,
initialBackoff: TimeSpan.FromMilliseconds(100),
maxBackoff: TimeSpan.FromSeconds(6),
backoffMultiplier: 1.3,
retryFilter: RetrySettings.FilterForStatusCodes(StatusCode.Unavailable)))
.WithTimeout(timeouts.Value)
}
)).ConfigureAwait(false);
}
}.BuildAsync().ConfigureAwait(false);
var subscriber = await CreateSubscriberClientAsync(subscriptionName,
clientCreationSettings: new SubscriberClient.ClientCreationSettings(clientCount: clientCount),
settings: new SubscriberClient.Settings
Expand Down Expand Up @@ -338,7 +333,7 @@ public async Task OversizedMessage()
var publisherApi = await CreatePublisherServiceApiClientAsync().ConfigureAwait(false);
await publisherApi.CreateTopicAsync(topicName).ConfigureAwait(false);
// Create Publisher
var publisher = await CreatePublisherClientAsync(topicName).ConfigureAwait(false);
var publisher = await PublisherClient.CreateAsync(topicName).ConfigureAwait(false);
// Create oversized message
Random rnd = new Random(1234);
byte[] msg = new byte[10_000_001];
Expand Down Expand Up @@ -367,9 +362,13 @@ public async Task MaxBatchSize()
PublisherClient.ApiMaxBatchingSettings.ByteCountThreshold, TimeSpan.FromSeconds(4));
var publisherServiceApiSettings = PublisherServiceApiSettings.GetDefault();
publisherServiceApiSettings.PublishSettings = CallSettings.FromExpiration(Expiration.FromTimeout(TimeSpan.FromSeconds(60)));
var publisher = await CreatePublisherClientAsync(topicName,
new PublisherClient.ClientCreationSettings(clientCount: 1, publisherServiceApiSettings: publisherServiceApiSettings),
new PublisherClient.Settings { BatchingSettings = batchingSettings }).ConfigureAwait(false);
var publisher = await new PublisherClientBuilder
{
TopicName = topicName,
ClientCount = 1,
ApiSettings = publisherServiceApiSettings,
Settings = new PublisherClient.Settings { BatchingSettings = batchingSettings }
}.BuildAsync().ConfigureAwait(false);
var msgCount = PublisherClient.ApiMaxBatchingSettings.ElementCountThreshold.Value;
var msgSize = PublisherClient.ApiMaxBatchingSettings.ByteCountThreshold.Value / msgCount;
var rnd = new Random(1234);
Expand Down Expand Up @@ -414,7 +413,7 @@ public async Task StopStartSubscriber(int totalMessageCount, double publisherFre
var subscriberApi = await CreateSubscriberServiceApiClientAsync().ConfigureAwait(false);
await subscriberApi.CreateSubscriptionAsync(subscriptionName, topicName, null, 60).ConfigureAwait(false);
// Create publisher, and start publishing messages
var publisher = await CreatePublisherClientAsync(topicName).ConfigureAwait(false);
var publisher = await PublisherClient.CreateAsync(topicName).ConfigureAwait(false);
var publishTask = Task.Run(async () =>
{
Console.WriteLine($"Starting to publish {totalMessageCount} messages.");
Expand Down Expand Up @@ -538,7 +537,7 @@ public async Task DeadLetterQueueAndDeliveryAttempt()
}
}).ConfigureAwait(false);

var pub = await CreatePublisherClientAsync(topicName, new PublisherClient.ClientCreationSettings(clientCount: 1)).ConfigureAwait(false);
var pub = await new PublisherClientBuilder { TopicName = topicName, ClientCount = 1 }.BuildAsync().ConfigureAwait(false);
var sub = await CreateSubscriberClientAsync(subscriptionName, new SubscriberClient.ClientCreationSettings(clientCount: 1)).ConfigureAwait(false);
var dlqSub = await CreateSubscriberClientAsync(dlqSubscriptionName, new SubscriberClient.ClientCreationSettings(clientCount: 1)).ConfigureAwait(false);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018 Google LLC
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -103,7 +103,10 @@ internal void Validate()

/// <summary>
/// Settings for creating <see cref="PublisherServiceApiClient"/>s.
/// This type is now obsolete; please use <see cref="PublisherClientBuilder"/> which provides an
/// API surface consistent with other clients (as well as additional Pub/Sub-specific properties such as <see cref="PublisherClientBuilder.ClientCount"/>).
/// </summary>
[Obsolete("Use PublisherClientBuilder to customize client settings.")]
public sealed class ClientCreationSettings
{
/// <summary>
Expand Down Expand Up @@ -206,6 +209,22 @@ internal void Validate()
/// </summary>
public static BatchingSettings ApiMaxBatchingSettings { get; } = new BatchingSettings(1000L, 10_000_000L, null);

/// <summary>
/// Create a <see cref="PublisherClient"/> instance associated with the specified <see cref="TopicName"/>, using default settings.
/// </summary>
/// <param name="topicName">The <see cref="TopicName"/> to publish messages to. Must not be null.</param>
/// <returns>A <see cref="PublisherClient"/> instance associated with the specified <see cref="TopicName"/>.</returns>
public static PublisherClient Create(TopicName topicName) =>
CreateBuilder(topicName, null, null).Build();

/// <summary>
/// Creates a <see cref="PublisherClient"/> instance associated with the specified <see cref="TopicName"/>, using default settings.
/// </summary>
/// <param name="topicName">The <see cref="TopicName"/> to publish messages to. Must not be null.</param>
/// <returns>A <see cref="PublisherClient"/> instance associated with the specified <see cref="TopicName"/>.</returns>
public static Task<PublisherClient> CreateAsync(TopicName topicName) =>
CreateBuilder(topicName, null, null).BuildAsync();

/// <summary>
/// Create a <see cref="PublisherClient"/> instance associated with the specified <see cref="TopicName"/>.
/// The default <paramref name="settings"/> and <paramref name="clientCreationSettings"/> are suitable for machines with
Expand All @@ -215,15 +234,14 @@ internal void Validate()
/// By default this method generates a gRPC channel per CPU core; if using a high-core-count machine and using many
/// clients concurrently then this may need reducing; use the setting <see cref="ClientCreationSettings.ClientCount"/>.
/// </summary>
/// <param name="topicName">The <see cref="TopicName"/> to publish messages to.</param>
/// <param name="topicName">The <see cref="TopicName"/> to publish messages to. Must not be null.</param>
/// <param name="clientCreationSettings">Optional. <see cref="ClientCreationSettings"/> specifying how to create
/// <see cref="PublisherServiceApiClient"/>s.</param>
/// <param name="settings">Optional. <see cref="Settings"/> for creating a <see cref="PublisherClient"/>.</param>
/// <returns>A <see cref="PublisherClient"/> instance associated with the specified <see cref="TopicName"/>.</returns>
[Obsolete("Use PublisherClient.Create(TopicName) to use the default settings, or PublisherClientBuilder for customization.")]
public static PublisherClient Create(TopicName topicName, ClientCreationSettings clientCreationSettings = null, Settings settings = null) =>
// With isAsync set to false, the returned task will already be completed (either successfully or faulted),
// so .ResultWithUnwrappedExceptions() will always return immediately.
CreateMaybeAsync(topicName, clientCreationSettings, settings, isAsync: false).ResultWithUnwrappedExceptions();
CreateBuilder(topicName, clientCreationSettings, settings).Build();

/// <summary>
/// Create a <see cref="PublisherClient"/> instance associated with the specified <see cref="TopicName"/>.
Expand All @@ -234,61 +252,28 @@ internal void Validate()
/// By default this method generates a gRPC channel per CPU core; if using a high-core-count machine and using many
/// clients concurrently then this may need reducing; use the setting <see cref="ClientCreationSettings.ClientCount"/>.
/// </summary>
/// <param name="topicName">The <see cref="TopicName"/> to publish messages to.</param>
/// <param name="topicName">The <see cref="TopicName"/> to publish messages to. Must not be null.</param>
/// <param name="clientCreationSettings">Optional. <see cref="ClientCreationSettings"/> specifying how to create
/// <see cref="PublisherServiceApiClient"/>s.</param>
/// <param name="settings">Optional. <see cref="Settings"/> for creating a <see cref="PublisherClient"/>.</param>
/// <returns>A <see cref="PublisherClient"/> instance associated with the specified <see cref="TopicName"/>.</returns>
[Obsolete("Use PublisherClient.CreateAsync(TopicName) to use the default settings, or PublisherClientBuilder for customization.")]
public static Task<PublisherClient> CreateAsync(TopicName topicName, ClientCreationSettings clientCreationSettings = null, Settings settings = null) =>
// With isAsync set to true, the returned task will complete asynchronously (if required) as expected.
CreateMaybeAsync(topicName, clientCreationSettings, settings, isAsync: true);
CreateBuilder(topicName, clientCreationSettings, settings).BuildAsync();

/// <summary>
/// Creates a <see cref="PublisherClient"/>.
/// <paramref name="isAsync"/> controls whether the returned task will complete synchronously or asynchronously, allowing this
/// method to be used by both <see cref="Create"/> and <see cref="CreateAsync"/>.
/// </summary>
private static async Task<PublisherClient> CreateMaybeAsync(TopicName topicName, ClientCreationSettings clientCreationSettings, Settings settings, bool isAsync)
{
clientCreationSettings?.Validate();
// Clone settings, just in case user modifies them and an await happens in this method
settings = settings?.Clone() ?? new Settings();
var clientCount = clientCreationSettings?.ClientCount ?? Environment.ProcessorCount;

// Create the channels and clients, and register shutdown functions for each channel
var clients = new PublisherServiceApiClient[clientCount];
var shutdowns = new Func<Task>[clientCount];
for (int i = 0; i < clientCount; i++)
#pragma warning disable CS0618 // Type or member is obsolete
private static PublisherClientBuilder CreateBuilder(TopicName topicName, ClientCreationSettings clientCreationSettings, Settings settings) =>
new PublisherClientBuilder
{
// Use a random arg to prevent sub-channel re-use in gRPC, so each channel uses its own connection.
var grpcChannelOptions = s_unlimitedSendReceiveChannelOptions
.WithCustomOption("sub-channel-separator", Guid.NewGuid().ToString());

// First builder to handle any endpoint detection etc. We build a gRPC channel
// with this.
var builder = new PublisherServiceApiClientBuilder
{
EmulatorDetection = clientCreationSettings?.EmulatorDetection ?? EmulatorDetection.None,
Endpoint = clientCreationSettings?.ServiceEndpoint,
ChannelCredentials = clientCreationSettings?.Credentials,
Settings = clientCreationSettings?.PublisherServiceApiSettings,
ChannelOptions = grpcChannelOptions
};
var channel = isAsync ?
await builder.CreateChannelAsync(cancellationToken: default).ConfigureAwait(false) :
builder.CreateChannel();

// Second builder doesn't need to do much, as we can build a call invoker from the channel.
clients[i] = new PublisherServiceApiClientBuilder
{
CallInvoker = channel.CreateCallInvoker(),
Settings = clientCreationSettings?.PublisherServiceApiSettings
}.Build();
shutdowns[i] = channel.ShutdownAsync;
}
Func<Task> shutdown = () => Task.WhenAll(shutdowns.Select(x => x()));
return new PublisherClientImpl(topicName, clients, settings, shutdown);
}
TopicName = GaxPreconditions.CheckNotNull(topicName, nameof(topicName)),
Settings = settings,
EmulatorDetection = clientCreationSettings?.EmulatorDetection ?? EmulatorDetection.None,
ClientCount = clientCreationSettings?.ClientCount,
ChannelCredentials = clientCreationSettings?.Credentials,
Endpoint = clientCreationSettings?.ServiceEndpoint,
ApiSettings = clientCreationSettings?.PublisherServiceApiSettings
};
#pragma warning restore CS0618 // Type or member is obsolete

/// <summary>
/// The associated <see cref="TopicName"/>.
Expand Down

0 comments on commit 09eeaa9

Please sign in to comment.