diff --git a/src/Admin/HostedServices/AzureQueueMailHostedService.cs b/src/Admin/HostedServices/AzureQueueMailHostedService.cs index 67e8623d7380..17d4b2fc5f41 100644 --- a/src/Admin/HostedServices/AzureQueueMailHostedService.cs +++ b/src/Admin/HostedServices/AzureQueueMailHostedService.cs @@ -18,7 +18,6 @@ namespace Bit.Admin.HostedServices { public class AzureQueueMailHostedService : IHostedService { - private readonly JsonSerializer _jsonSerializer; private readonly ILogger _logger; private readonly GlobalSettings _globalSettings; private readonly IMailService _mailService; @@ -35,11 +34,6 @@ public class AzureQueueMailHostedService : IHostedService _logger = logger; _mailService = mailService; _globalSettings = globalSettings; - - _jsonSerializer = JsonSerializer.Create(new JsonSerializerSettings - { - Converters = new[] { new EncodedStringConverter() }, - }); } public Task StartAsync(CancellationToken cancellationToken) @@ -76,10 +70,10 @@ private async Task ExecuteAsync(CancellationToken cancellationToken) { try { - var token = JToken.Parse(message.MessageText); + var token = JToken.Parse(message.DecodeMessageText()); if (token is JArray) { - foreach (var mailQueueMessage in token.ToObject>(_jsonSerializer)) + foreach (var mailQueueMessage in token.ToObject>()) { await _mailService.SendEnqueuedMailMessageAsync(mailQueueMessage); } diff --git a/src/Core/Services/Implementations/AzureQueueService.cs b/src/Core/Services/Implementations/AzureQueueService.cs index bb5d89595e71..a8162c5e1e29 100644 --- a/src/Core/Services/Implementations/AzureQueueService.cs +++ b/src/Core/Services/Implementations/AzureQueueService.cs @@ -1,5 +1,6 @@ using System.Collections.Generic; using System.Linq; +using System.Text; using System.Threading.Tasks; using Azure.Storage.Queues; using Bit.Core.Utilities; @@ -17,13 +18,14 @@ protected AzureQueueService(QueueClient queueClient, JsonSerializerSettings json { _queueClient = queueClient; _jsonSettings = jsonSettings; - if (!_jsonSettings.Converters.Any(c => c.GetType() == typeof(EncodedStringConverter))) - { - _jsonSettings.Converters.Add(new EncodedStringConverter()); - } } - public async Task CreateAsync(T message) => await CreateManyAsync(new[] { message }); + public async Task CreateAsync(T message) + { + var json = JsonConvert.SerializeObject(message, _jsonSettings); + var base64 = CoreHelpers.Base64EncodeString(json); + await _queueClient.SendMessageAsync(base64); + } public async Task CreateManyAsync(IEnumerable messages) { @@ -32,36 +34,62 @@ public async Task CreateManyAsync(IEnumerable messages) return; } - foreach (var json in SerializeMany(messages)) + if (!messages.Skip(1).Any()) + { + await CreateAsync(messages.First()); + return; + } + + foreach (var json in SerializeMany(messages, _jsonSettings)) { await _queueClient.SendMessageAsync(json); } } - - private IEnumerable SerializeMany(IEnumerable messages) + protected IEnumerable SerializeMany(IEnumerable messages, JsonSerializerSettings jsonSettings) { - string SerializeMessage(T message) => JsonConvert.SerializeObject(message, _jsonSettings); + // Calculate Base-64 encoded text with padding + int getBase64Size(int byteCount) => ((4 * byteCount / 3) + 3) & ~3; - var messagesLists = new List> { new List() }; - var strings = new List(); - var ListMessageLength = 2; // to account for json array brackets "[]" - foreach (var (message, jsonEvent) in messages.Select(m => (m, SerializeMessage(m)))) - { + var messagesList = new List(); + var messagesListSize = 0; + + int calculateByteSize(int totalSize, int toAdd) => + // Calculate the total length this would be w/ "[]" and commas + getBase64Size(totalSize + toAdd + messagesList.Count + 2); - var messageLength = jsonEvent.Length + 1; // To account for json array comma - if (ListMessageLength + messageLength > _queueClient.MessageMaxBytes) + // Format the final array string, i.e. [{...},{...}] + string getArrayString() + { + if (messagesList.Count == 1) { - messagesLists.Add(new List { message }); - ListMessageLength = 2 + messageLength; + return CoreHelpers.Base64EncodeString(messagesList[0]); } - else + return CoreHelpers.Base64EncodeString( + string.Concat("[", string.Join(',', messagesList), "]")); + } + + var serializedMessages = messages.Select(message => + JsonConvert.SerializeObject(message, jsonSettings)); + + foreach (var message in serializedMessages) + { + var messageSize = Encoding.UTF8.GetByteCount(message); + if (calculateByteSize(messagesListSize, messageSize) > _queueClient.MessageMaxBytes) { - messagesLists.Last().Add(message); - ListMessageLength += messageLength; + yield return getArrayString(); + messagesListSize = 0; + messagesList.Clear(); } + + messagesList.Add(message); + messagesListSize += messageSize; + } + + if (messagesList.Any()) + { + yield return getArrayString(); } - return messagesLists.Select(l => JsonConvert.SerializeObject(l, _jsonSettings)); } } } diff --git a/src/Core/Utilities/CoreHelpers.cs b/src/Core/Utilities/CoreHelpers.cs index c00703702654..58de340a8702 100644 --- a/src/Core/Utilities/CoreHelpers.cs +++ b/src/Core/Utilities/CoreHelpers.cs @@ -24,6 +24,9 @@ using IdentityModel; using System.Text.Json; using Bit.Core.Enums.Provider; +using Azure.Storage.Queues; +using Azure.Storage.Queues.Models; +using System.Threading; namespace Bit.Core.Utilities { @@ -901,5 +904,22 @@ public static ICollection AddIfNotExists(this ICollection list, T item) list.Add(item); return list; } + + public static string DecodeMessageText(this QueueMessage message) + { + var text = message?.MessageText; + if (string.IsNullOrWhiteSpace(text)) + { + return text; + } + try + { + return Base64DecodeString(text); + } + catch + { + return text; + } + } } } diff --git a/src/Core/Utilities/EncodedStringConverter.cs b/src/Core/Utilities/EncodedStringConverter.cs deleted file mode 100644 index 5bb033d5fd3a..000000000000 --- a/src/Core/Utilities/EncodedStringConverter.cs +++ /dev/null @@ -1,35 +0,0 @@ -using System; -using Newtonsoft.Json; - -namespace Bit.Core.Utilities -{ - public class EncodedStringConverter : JsonConverter - { - public override bool CanConvert(Type objectType) => objectType == typeof(string); - - public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) - { - if (reader.TokenType == JsonToken.Null) - { - return existingValue; - } - - var value = reader.Value as string; - return System.Net.WebUtility.HtmlDecode(value); - } - - public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) - { - if (value == null) - { - if (serializer.NullValueHandling == NullValueHandling.Include) - { - writer.WriteNull(); - } - return; - } - - writer.WriteValue(System.Net.WebUtility.HtmlEncode((string)value)); - } - } -} diff --git a/src/EventsProcessor/AzureQueueHostedService.cs b/src/EventsProcessor/AzureQueueHostedService.cs index 470f646d11bc..a5d86d661ef0 100644 --- a/src/EventsProcessor/AzureQueueHostedService.cs +++ b/src/EventsProcessor/AzureQueueHostedService.cs @@ -18,7 +18,6 @@ namespace Bit.EventsProcessor { public class AzureQueueHostedService : IHostedService, IDisposable { - private readonly JsonSerializer _jsonSerializer; private readonly ILogger _logger; private readonly IConfiguration _configuration; @@ -33,11 +32,6 @@ public class AzureQueueHostedService : IHostedService, IDisposable { _logger = logger; _configuration = configuration; - - _jsonSerializer = JsonSerializer.Create(new JsonSerializerSettings - { - Converters = new[] { new EncodedStringConverter() }, - }); } public Task StartAsync(CancellationToken cancellationToken) @@ -84,7 +78,7 @@ private async Task ExecuteAsync(CancellationToken cancellationToken) { foreach (var message in messages.Value) { - await ProcessQueueMessageAsync(message.MessageText, cancellationToken); + await ProcessQueueMessageAsync(message.DecodeMessageText(), cancellationToken); await _queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt); } } @@ -118,7 +112,7 @@ public async Task ProcessQueueMessageAsync(string message, CancellationToken can var token = JToken.Parse(message); if (token is JArray) { - var indexedEntities = token.ToObject>(_jsonSerializer) + var indexedEntities = token.ToObject>() .SelectMany(e => EventTableEntity.IndexEvent(e)); events.AddRange(indexedEntities); } diff --git a/src/Notifications/AzureQueueHostedService.cs b/src/Notifications/AzureQueueHostedService.cs index 85a7038dd35c..e90203f271d6 100644 --- a/src/Notifications/AzureQueueHostedService.cs +++ b/src/Notifications/AzureQueueHostedService.cs @@ -7,6 +7,7 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Azure.Storage.Queues; +using Bit.Core.Utilities; namespace Bit.Notifications { @@ -67,7 +68,7 @@ private async Task ExecuteAsync(CancellationToken cancellationToken) try { await HubHelpers.SendNotificationToHubAsync( - message.MessageText, _hubContext, cancellationToken); + message.DecodeMessageText(), _hubContext, cancellationToken); await _queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt); } catch (Exception e)