Skip to content

Commit

Permalink
feat: Add support for Publisher Compression.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rishabh-V committed May 24, 2023
1 parent 3518402 commit e684e05
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,21 @@ public sealed class Settings
/// </summary>
public TimeSpan? DisposeTimeout { get; set; }

/// <summary>
/// Enables publish message compression. If set to <c>true</c>, messages will be compressed before being sent to the server
/// by the <see cref="PublisherClient"/>.
/// </summary>
public bool EnableCompression { get; set; }

/// <summary>
/// Specifies the threshold for the number of bytes in a message batch before compression is applied.
/// This property comes into play only when <see cref="EnableCompression"/> is set to <c>true</c>.
/// If the number of bytes in a batch is less than this value, compression will not be applied even
/// if <see cref="EnableCompression"/> is <c>true</c>.
/// If <c>null</c>, defaults to <see cref="DefaultCompressionBytesThreshold"/>.
/// </summary>
public long? CompressionBytesThreshold { get; set; }

/// <summary>
/// Create a new instance.
/// </summary>
Expand All @@ -61,6 +76,8 @@ internal Settings(Settings other)
Scheduler = other.Scheduler;
EnableMessageOrdering = other.EnableMessageOrdering;
DisposeTimeout = other.DisposeTimeout;
EnableCompression = other.EnableCompression;
CompressionBytesThreshold = other.CompressionBytesThreshold;
}

internal void Validate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public abstract partial class PublisherClient : IAsyncDisposable
/// </summary>
public static TimeSpan DefaultDisposeTimeout { get; } = TimeSpan.FromSeconds(5);

/// <summary>
/// The default <see cref="Settings.CompressionBytesThreshold"/> of 240 bytes for the publisher message compression.
/// </summary>
public static long DefaultCompressionBytesThreshold { get; } = 240;

/// <summary>
/// The associated <see cref="TopicName"/>.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ namespace Google.Cloud.PubSub.V1;
/// </summary>
public sealed class PublisherClientImpl : PublisherClient
{
private const string CompressionHeaderKey = "grpc-internal-encoding-request";
private const string CompressionHeaderValue = "gzip";

private static readonly CallSettings s_compressionCallSettings = CallSettings.FromHeader(CompressionHeaderKey, CompressionHeaderValue);

/// <summary>
/// A batch of messages that all have the same ordering-key (or no ordering-key), and will be
/// sent to the server in a single network call.
Expand Down Expand Up @@ -133,6 +138,8 @@ internal PublisherClientImpl(TopicName topicName, IEnumerable<PublisherServiceAp
_taskHelper = GaxPreconditions.CheckNotNull(taskHelper, nameof(taskHelper));
_enableMessageOrdering = settings.EnableMessageOrdering;
_disposeTimeout = settings.DisposeTimeout ?? DefaultDisposeTimeout;
_enableCompression = settings.EnableCompression;
_compressionBytesThreshold = settings.CompressionBytesThreshold ?? DefaultCompressionBytesThreshold;

// Initialise batching settings. Use ApiMax settings for components not given.
var batchingSettings = settings.BatchingSettings ?? DefaultBatchingSettings;
Expand All @@ -156,6 +163,8 @@ internal PublisherClientImpl(TopicName topicName, IEnumerable<PublisherServiceAp
private readonly Func<Task> _shutdown;
private readonly bool _enableMessageOrdering;
private readonly TimeSpan _disposeTimeout;
private readonly bool _enableCompression;
private readonly long _compressionBytesThreshold;

// Batching settings
private readonly long _batchElementCountThreshold;
Expand Down Expand Up @@ -446,8 +455,16 @@ private void TriggerSend()

async Task Send()
{
var callSettings = CallSettings.FromCancellationToken(_hardStopCts.Token);

// If compression is enabled, and the batch size is greater than or equal to the threshold, set the compression header.
if (_enableCompression && batch.ByteCount >= _compressionBytesThreshold)
{
callSettings = callSettings.MergedWith(s_compressionCallSettings);
}

// Perform the RPC to server, catching exceptions.
var publishTask = client.PublishAsync(TopicName, batch.Messages, CallSettings.FromCancellationToken(_hardStopCts.Token));
var publishTask = client.PublishAsync(TopicName, batch.Messages, callSettings);
var response = await _taskHelper.ConfigureAwaitHideErrors(() => publishTask, null);
Action postLockAction;
lock (_lock)
Expand Down

0 comments on commit e684e05

Please sign in to comment.