Skip to content

Commit

Permalink
Add sync Create() method to PublisherClient and SubscriberClient (#5895)
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisdunelm committed Feb 2, 2021
1 parent 59e5691 commit 071ea6f
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,43 @@ internal void Validate()
/// high network bandwidth (e.g. Google Compute Engine instances). If running with more limited network bandwidth, some
/// settings may need changing; especially
/// <see cref="ClientCreationSettings.PublisherServiceApiSettings"/>.<see cref="PublisherServiceApiSettings.PublishSettings"/>.<see cref="CallSettings.Retry"/>.
/// By default this method generates a gRPC channel per CPU core; if using a high-core-count machine and using many
/// clients concurrently then this may need reducing; use the setting <see cref="ClientCreationSettings.ClientCount"/>.
/// </summary>
/// <param name="topicName">The <see cref="TopicName"/> to publish messages to.</param>
/// <param name="clientCreationSettings">Optional. <see cref="ClientCreationSettings"/> specifying how to create
/// <see cref="PublisherServiceApiClient"/>s.</param>
/// <param name="settings">Optional. <see cref="Settings"/> for creating a <see cref="PublisherClient"/>.</param>
/// <returns>A <see cref="PublisherClient"/> instance associated with the specified <see cref="TopicName"/>.</returns>
public static async Task<PublisherClient> CreateAsync(TopicName topicName, ClientCreationSettings clientCreationSettings = null, Settings settings = null)
public static PublisherClient Create(TopicName topicName, ClientCreationSettings clientCreationSettings = null, Settings settings = null) =>
// With isAsync set to false, the returned task will already be completed (either successfully or faulted),
// so .ResultWithUnwrappedExceptions() will always return immediately.
CreateMaybeAsync(topicName, clientCreationSettings, settings, isAsync: false).ResultWithUnwrappedExceptions();

/// <summary>
/// Create a <see cref="PublisherClient"/> instance associated with the specified <see cref="TopicName"/>.
/// The default <paramref name="settings"/> and <paramref name="clientCreationSettings"/> are suitable for machines with
/// high network bandwidth (e.g. Google Compute Engine instances). If running with more limited network bandwidth, some
/// settings may need changing; especially
/// <see cref="ClientCreationSettings.PublisherServiceApiSettings"/>.<see cref="PublisherServiceApiSettings.PublishSettings"/>.<see cref="CallSettings.Retry"/>.
/// By default this method generates a gRPC channel per CPU core; if using a high-core-count machine and using many
/// clients concurrently then this may need reducing; use the setting <see cref="ClientCreationSettings.ClientCount"/>.
/// </summary>
/// <param name="topicName">The <see cref="TopicName"/> to publish messages to.</param>
/// <param name="clientCreationSettings">Optional. <see cref="ClientCreationSettings"/> specifying how to create
/// <see cref="PublisherServiceApiClient"/>s.</param>
/// <param name="settings">Optional. <see cref="Settings"/> for creating a <see cref="PublisherClient"/>.</param>
/// <returns>A <see cref="PublisherClient"/> instance associated with the specified <see cref="TopicName"/>.</returns>
public static Task<PublisherClient> CreateAsync(TopicName topicName, ClientCreationSettings clientCreationSettings = null, Settings settings = null) =>
// With isAsync set to true, the returned task will complete asynchronously (if required) as expected.
CreateMaybeAsync(topicName, clientCreationSettings, settings, isAsync: true);

/// <summary>
/// Creates a <see cref="PublisherClient"/>.
/// <paramref name="isAsync"/> controls whether the returned task will complete synchronously or asynchronously, allowing this
/// method to be used by both <see cref="Create"/> and <see cref="CreateAsync"/>.
/// </summary>
private static async Task<PublisherClient> CreateMaybeAsync(TopicName topicName, ClientCreationSettings clientCreationSettings, Settings settings, bool isAsync)
{
clientCreationSettings?.Validate();
// Clone settings, just in case user modifies them and an await happens in this method
Expand All @@ -225,7 +255,9 @@ public static async Task<PublisherClient> CreateAsync(TopicName topicName, Clien
// Use default credentials if none given.
if (channelCredentials == null)
{
var credentials = await GoogleCredential.GetApplicationDefaultAsync().ConfigureAwait(false);
var credentials = isAsync ?
(await GoogleCredential.GetApplicationDefaultAsync().ConfigureAwait(false)) :
GoogleCredential.GetApplicationDefault();
if (credentials.IsCreateScopedRequired)
{
credentials = credentials.CreateScoped(PublisherServiceApiClient.DefaultScopes);
Expand All @@ -252,7 +284,9 @@ public static async Task<PublisherClient> CreateAsync(TopicName topicName, Clien
Settings = clientCreationSettings?.PublisherServiceApiSettings,
ChannelOptions = grpcChannelOptions
};
var channel = await builder.CreateChannelAsync(cancellationToken: default).ConfigureAwait(false);
var channel = isAsync ?
await builder.CreateChannelAsync(cancellationToken: default).ConfigureAwait(false) :
builder.CreateChannel();

// Second builder doesn't need to do much, as we can build a call invoker from the channel.
clients[i] = new PublisherServiceApiClientBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,19 @@ internal async Task<ChannelBase> CreateChannelAsync(CancellationToken cancellati
var grpcAdapter = GrpcAdapter ?? DefaultGrpcAdapter;
return grpcAdapter.CreateChannel(endpoint, credentials, effectiveBuilder.GetChannelOptions());
}

/// <summary>
/// Creates a channel for this builder, observing any emulator configuration that has been set.
/// This method is used by PublisherClient, which needs the channel for shutdown purposes.
/// </summary>
internal ChannelBase CreateChannel()
{
// Note: no need to try to detect the channel pool here, as we know we don't want to use it.
var effectiveBuilder = MaybeCreateEmulatorClientBuilder() ?? this;
var endpoint = effectiveBuilder.Endpoint ?? GetDefaultEndpoint();
var credentials = effectiveBuilder.GetChannelCredentials();
var grpcAdapter = GrpcAdapter ?? DefaultGrpcAdapter;
return grpcAdapter.CreateChannel(endpoint, credentials, effectiveBuilder.GetChannelOptions());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,34 @@ internal void Validate()
/// The default <paramref name="settings"/> and <paramref name="clientCreationSettings"/> are suitable for machines with
/// high network bandwidth (e.g. Google Compute Engine instances). If running with more limited network bandwidth, some
/// settings may need changing; especially <see cref="Settings.AckDeadline"/>.
/// By default this method generates a gRPC channel per CPU core; if using a high-core-count machine and using many
/// clients concurrently then this may need reducing; use the setting <see cref="ClientCreationSettings.ClientCount"/>.
/// </summary>
/// <param name="subscriptionName">The <see cref="SubscriptionName"/> to receive messages from.</param>
/// <param name="clientCreationSettings">Optional. <see cref="ClientCreationSettings"/> specifying how to create
/// <see cref="SubscriberClient"/>s.</param>
/// <param name="settings">Optional. <see cref="Settings"/> for creating a <see cref="SubscriberClient"/>.</param>
/// <returns>A <see cref="SubscriberClient"/> instance associated with the specified <see cref="SubscriptionName"/>.</returns>
public static async Task<SubscriberClient> CreateAsync(SubscriptionName subscriptionName, ClientCreationSettings clientCreationSettings = null, Settings settings = null)
public static SubscriberClient Create(SubscriptionName subscriptionName, ClientCreationSettings clientCreationSettings = null, Settings settings = null) =>
CreateMaybeAsync(subscriptionName, clientCreationSettings, settings, isAsync: false).ResultWithUnwrappedExceptions();

/// <summary>
/// Create a <see cref="SubscriberClient"/> instance associated with the specified <see cref="SubscriptionName"/>.
/// The default <paramref name="settings"/> and <paramref name="clientCreationSettings"/> are suitable for machines with
/// high network bandwidth (e.g. Google Compute Engine instances). If running with more limited network bandwidth, some
/// settings may need changing; especially <see cref="Settings.AckDeadline"/>.
/// By default this method generates a gRPC channel per CPU core; if using a high-core-count machine and using many
/// clients concurrently then this may need reducing; use the setting <see cref="ClientCreationSettings.ClientCount"/>.
/// </summary>
/// <param name="subscriptionName">The <see cref="SubscriptionName"/> to receive messages from.</param>
/// <param name="clientCreationSettings">Optional. <see cref="ClientCreationSettings"/> specifying how to create
/// <see cref="SubscriberClient"/>s.</param>
/// <param name="settings">Optional. <see cref="Settings"/> for creating a <see cref="SubscriberClient"/>.</param>
/// <returns>A <see cref="SubscriberClient"/> instance associated with the specified <see cref="SubscriptionName"/>.</returns>
public static Task<SubscriberClient> CreateAsync(SubscriptionName subscriptionName, ClientCreationSettings clientCreationSettings = null, Settings settings = null) =>
CreateMaybeAsync(subscriptionName, clientCreationSettings, settings, isAsync: true);

private static async Task<SubscriberClient> CreateMaybeAsync(SubscriptionName subscriptionName, ClientCreationSettings clientCreationSettings, Settings settings, bool isAsync)
{
GaxPreconditions.CheckNotNull(subscriptionName, nameof(subscriptionName));
clientCreationSettings?.Validate();
Expand All @@ -312,7 +333,9 @@ public static async Task<SubscriberClient> CreateAsync(SubscriptionName subscrip
Settings = clientCreationSettings?.SubscriberServiceApiSettings,
ChannelOptions = grpcChannelOptions
};
var channel = await builder.CreateChannelAsync(cancellationToken: default).ConfigureAwait(false);
var channel = isAsync ?
(await builder.CreateChannelAsync(cancellationToken: default).ConfigureAwait(false)) :
builder.CreateChannel();

// Second builder doesn't need to do much, as we can build a call invoker from the channel.
clients[i] = new SubscriberServiceApiClientBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,19 @@ internal async Task<ChannelBase> CreateChannelAsync(CancellationToken cancellati
var grpcAdapter = GrpcAdapter ?? DefaultGrpcAdapter;
return grpcAdapter.CreateChannel(endpoint, credentials, effectiveBuilder.GetChannelOptions());
}

/// <summary>
/// Creates a channel for this builder, observing any emulator configuration that has been set.
/// This method is used by SubscriberClient, which needs the channel for shutdown purposes.
/// </summary>
internal ChannelBase CreateChannel()
{
// Note: no need to try to detect the channel pool here, as we know we don't want to use it.
var effectiveBuilder = MaybeCreateEmulatorClientBuilder() ?? this;
var endpoint = effectiveBuilder.Endpoint ?? GetDefaultEndpoint();
var credentials = effectiveBuilder.GetChannelCredentials();
var grpcAdapter = GrpcAdapter ?? DefaultGrpcAdapter;
return grpcAdapter.CreateChannel(endpoint, credentials, effectiveBuilder.GetChannelOptions());
}
}
}

0 comments on commit 071ea6f

Please sign in to comment.