Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix queue message encoding for Azure (UTF-16 in XML) #1439

Merged
merged 5 commits into from Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 2 additions & 8 deletions src/Admin/HostedServices/AzureQueueMailHostedService.cs
Expand Up @@ -18,7 +18,6 @@ namespace Bit.Admin.HostedServices
{
public class AzureQueueMailHostedService : IHostedService
{
private readonly JsonSerializer _jsonSerializer;
private readonly ILogger<AzureQueueMailHostedService> _logger;
private readonly GlobalSettings _globalSettings;
private readonly IMailService _mailService;
Expand All @@ -35,11 +34,6 @@ public class AzureQueueMailHostedService : IHostedService
_logger = logger;
_mailService = mailService;
_globalSettings = globalSettings;

_jsonSerializer = JsonSerializer.Create(new JsonSerializerSettings
{
Converters = new[] { new EncodedStringConverter() },
});
}

public Task StartAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -76,10 +70,10 @@ private async Task ExecuteAsync(CancellationToken cancellationToken)
{
try
{
var token = JToken.Parse(message.MessageText);
var token = JToken.Parse(message.DecodeMessageText());
if (token is JArray)
{
foreach (var mailQueueMessage in token.ToObject<List<MailQueueMessage>>(_jsonSerializer))
foreach (var mailQueueMessage in token.ToObject<List<MailQueueMessage>>())
{
await _mailService.SendEnqueuedMailMessageAsync(mailQueueMessage);
}
Expand Down
72 changes: 50 additions & 22 deletions src/Core/Services/Implementations/AzureQueueService.cs
@@ -1,5 +1,6 @@
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Azure.Storage.Queues;
using Bit.Core.Utilities;
Expand All @@ -17,13 +18,14 @@ protected AzureQueueService(QueueClient queueClient, JsonSerializerSettings json
{
_queueClient = queueClient;
_jsonSettings = jsonSettings;
if (!_jsonSettings.Converters.Any(c => c.GetType() == typeof(EncodedStringConverter)))
{
_jsonSettings.Converters.Add(new EncodedStringConverter());
}
}

public async Task CreateAsync(T message) => await CreateManyAsync(new[] { message });
public async Task CreateAsync(T message)
{
var json = JsonConvert.SerializeObject(message, _jsonSettings);
var base64 = CoreHelpers.Base64EncodeString(json);
await _queueClient.SendMessageAsync(base64);
}

public async Task CreateManyAsync(IEnumerable<T> messages)
{
Expand All @@ -32,36 +34,62 @@ public async Task CreateManyAsync(IEnumerable<T> messages)
return;
}

foreach (var json in SerializeMany(messages))
if (!messages.Skip(1).Any())
{
await CreateAsync(messages.First());
return;
}

foreach (var json in SerializeMany(messages, _jsonSettings))
{
await _queueClient.SendMessageAsync(json);
}
}


private IEnumerable<string> SerializeMany(IEnumerable<T> messages)
protected IEnumerable<string> SerializeMany(IEnumerable<T> messages, JsonSerializerSettings jsonSettings)
{
string SerializeMessage(T message) => JsonConvert.SerializeObject(message, _jsonSettings);
// Calculate Base-64 encoded text with padding
int getBase64Size(int byteCount) => ((4 * byteCount / 3) + 3) & ~3;

var messagesLists = new List<List<T>> { new List<T>() };
var strings = new List<string>();
var ListMessageLength = 2; // to account for json array brackets "[]"
foreach (var (message, jsonEvent) in messages.Select(m => (m, SerializeMessage(m))))
{
var messagesList = new List<string>();
var messagesListSize = 0;

int calculateByteSize(int totalSize, int toAdd) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't looked into it in-depth. But there is a Base64.GetMaxEncodedToUtf8Length helper we could perhaps use instead? https://docs.microsoft.com/en-us/dotnet/api/system.buffers.text.base64.getmaxencodedtoutf8length?view=net-5.0#system-buffers-text-base64-getmaxencodedtoutf8length(system-int32)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Hinton , I missed that, however from the src of that method it looks like my implementation here is more accurate and most of the logic is still necessary, including this particular method that determines the total combined "next" JSON block to be added to see if it needs to start a new batch. I think we're good on this one IMHO.

// Calculate the total length this would be w/ "[]" and commas
getBase64Size(totalSize + toAdd + messagesList.Count + 2);

var messageLength = jsonEvent.Length + 1; // To account for json array comma
if (ListMessageLength + messageLength > _queueClient.MessageMaxBytes)
// Format the final array string, i.e. [{...},{...}]
string getArrayString()
{
if (messagesList.Count == 1)
{
messagesLists.Add(new List<T> { message });
ListMessageLength = 2 + messageLength;
return CoreHelpers.Base64EncodeString(messagesList[0]);
}
else
return CoreHelpers.Base64EncodeString(
string.Concat("[", string.Join(',', messagesList), "]"));
}

var serializedMessages = messages.Select(message =>
JsonConvert.SerializeObject(message, jsonSettings));

foreach (var message in serializedMessages)
{
var messageSize = Encoding.UTF8.GetByteCount(message);
if (calculateByteSize(messagesListSize, messageSize) > _queueClient.MessageMaxBytes)
{
messagesLists.Last().Add(message);
ListMessageLength += messageLength;
yield return getArrayString();
messagesListSize = 0;
messagesList.Clear();
}

messagesList.Add(message);
messagesListSize += messageSize;
}

if (messagesList.Any())
{
yield return getArrayString();
}
return messagesLists.Select(l => JsonConvert.SerializeObject(l, _jsonSettings));
}
}
}
20 changes: 20 additions & 0 deletions src/Core/Utilities/CoreHelpers.cs
Expand Up @@ -24,6 +24,9 @@
using IdentityModel;
using System.Text.Json;
using Bit.Core.Enums.Provider;
using Azure.Storage.Queues;
using Azure.Storage.Queues.Models;
using System.Threading;

namespace Bit.Core.Utilities
{
Expand Down Expand Up @@ -901,5 +904,22 @@ public static ICollection<T> AddIfNotExists<T>(this ICollection<T> list, T item)
list.Add(item);
return list;
}

public static string DecodeMessageText(this QueueMessage message)
{
var text = message?.MessageText;
if (string.IsNullOrWhiteSpace(text))
{
return text;
}
try
{
return Base64DecodeString(text);
}
catch
{
return text;
}
}
}
}
35 changes: 0 additions & 35 deletions src/Core/Utilities/EncodedStringConverter.cs

This file was deleted.

10 changes: 2 additions & 8 deletions src/EventsProcessor/AzureQueueHostedService.cs
Expand Up @@ -18,7 +18,6 @@ namespace Bit.EventsProcessor
{
public class AzureQueueHostedService : IHostedService, IDisposable
{
private readonly JsonSerializer _jsonSerializer;
private readonly ILogger<AzureQueueHostedService> _logger;
private readonly IConfiguration _configuration;

Expand All @@ -33,11 +32,6 @@ public class AzureQueueHostedService : IHostedService, IDisposable
{
_logger = logger;
_configuration = configuration;

_jsonSerializer = JsonSerializer.Create(new JsonSerializerSettings
{
Converters = new[] { new EncodedStringConverter() },
});
}

public Task StartAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -84,7 +78,7 @@ private async Task ExecuteAsync(CancellationToken cancellationToken)
{
foreach (var message in messages.Value)
{
await ProcessQueueMessageAsync(message.MessageText, cancellationToken);
await ProcessQueueMessageAsync(message.DecodeMessageText(), cancellationToken);
await _queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt);
}
}
Expand Down Expand Up @@ -118,7 +112,7 @@ public async Task ProcessQueueMessageAsync(string message, CancellationToken can
var token = JToken.Parse(message);
if (token is JArray)
{
var indexedEntities = token.ToObject<List<EventMessage>>(_jsonSerializer)
var indexedEntities = token.ToObject<List<EventMessage>>()
.SelectMany(e => EventTableEntity.IndexEvent(e));
events.AddRange(indexedEntities);
}
Expand Down
3 changes: 2 additions & 1 deletion src/Notifications/AzureQueueHostedService.cs
Expand Up @@ -7,6 +7,7 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Azure.Storage.Queues;
using Bit.Core.Utilities;

namespace Bit.Notifications
{
Expand Down Expand Up @@ -67,7 +68,7 @@ private async Task ExecuteAsync(CancellationToken cancellationToken)
try
{
await HubHelpers.SendNotificationToHubAsync(
message.MessageText, _hubContext, cancellationToken);
message.DecodeMessageText(), _hubContext, cancellationToken);
await _queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt);
}
catch (Exception e)
Expand Down