Skip to content

Commit

Permalink
[1ES] Migrate Diagnostic Events to Azure.Data.Tables (#10167)
Browse files Browse the repository at this point in the history
* Migrate DiagnosticEvents from Azure.Cosmos.Table to Azure.Data.Tables
---------
Co-authored-by: Jacob Viau <javia@microsoft.com>
  • Loading branch information
cjaliaga committed Jun 17, 2024
1 parent e5a04b7 commit 8d4e9df
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 106 deletions.
17 changes: 13 additions & 4 deletions src/WebJobs.Script.WebHost/Diagnostics/DiagnosticEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading;
using Microsoft.Azure.Cosmos.Table;
using System.Runtime.Serialization;
using Azure;
using Azure.Data.Tables;
using Microsoft.Azure.WebJobs.Script.WebHost.Helpers;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Script.WebHost.Diagnostics
{
public class DiagnosticEvent : TableEntity
public class DiagnosticEvent : ITableEntity
{
internal const string CurrentEventVersion = "2024-05-01";

Expand All @@ -23,6 +24,14 @@ public DiagnosticEvent(string hostId, DateTime timestamp)
EventVersion = CurrentEventVersion;
}

public string PartitionKey { get; set; }

public string RowKey { get; set; }

public DateTimeOffset? Timestamp { get; set; }

public ETag ETag { get; set; }

public string EventVersion { get; set; }

public int HitCount { get; set; }
Expand All @@ -35,7 +44,7 @@ public DiagnosticEvent(string hostId, DateTime timestamp)

public int Level { get; set; }

[IgnoreProperty]
[IgnoreDataMember]
public LogLevel LogLevel
{
get { return (LogLevel)Level; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Table;
using Azure.Data.Tables;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Hosting;
using Microsoft.Azure.WebJobs.Logging;
Expand All @@ -33,8 +33,8 @@ public class DiagnosticEventTableStorageRepository : IDiagnosticEventRepository,
private readonly object _syncLock = new object();

private ConcurrentDictionary<string, DiagnosticEvent> _events = new ConcurrentDictionary<string, DiagnosticEvent>();
private CloudTableClient _tableClient;
private CloudTable _diagnosticEventsTable;
private TableServiceClient _tableClient;
private TableClient _diagnosticEventsTable;
private string _hostId;
private bool _disposed = false;
private bool _purged = false;
Expand All @@ -55,22 +55,21 @@ public class DiagnosticEventTableStorageRepository : IDiagnosticEventRepository,
ILogger<DiagnosticEventTableStorageRepository> logger)
: this(configuration, hostIdProvider, environment, scriptHost, logger, LogFlushInterval) { }

internal CloudTableClient TableClient
internal TableServiceClient TableClient
{
get
{
if (!_environment.IsPlaceholderModeEnabled() && _tableClient == null)
{
string storageConnectionString = _configuration.GetWebJobsConnectionString(ConnectionStringNames.Storage);
if (!string.IsNullOrEmpty(storageConnectionString)
&& CloudStorageAccount.TryParse(storageConnectionString, out CloudStorageAccount account))

try
{
var tableClientConfig = new TableClientConfiguration();
_tableClient = new CloudTableClient(account.TableStorageUri, account.Credentials, tableClientConfig);
_tableClient = new TableServiceClient(storageConnectionString);
}
else
catch (Exception ex)
{
_logger.LogError("Azure Storage connection string is empty or invalid. Unable to write diagnostic events.");
_logger.LogError(ex, "Azure Storage connection string is empty or invalid. Unable to write diagnostic events.");
}
}

Expand All @@ -92,7 +91,7 @@ internal string HostId

internal ConcurrentDictionary<string, DiagnosticEvent> Events => _events;

internal CloudTable GetDiagnosticEventsTable(DateTime? now = null)
internal TableClient GetDiagnosticEventsTable(DateTime? now = null)
{
if (TableClient != null)
{
Expand All @@ -103,7 +102,7 @@ internal CloudTable GetDiagnosticEventsTable(DateTime? now = null)
if (_diagnosticEventsTable == null || currentTableName != _tableName)
{
_tableName = currentTableName;
_diagnosticEventsTable = TableClient.GetTableReference(_tableName);
_diagnosticEventsTable = TableClient.GetTableClient(_tableName);
}
}

Expand Down Expand Up @@ -134,31 +133,27 @@ private async Task PurgePreviousEventVersions()
foreach (var table in tables)
{
var tableRecords = await table.ExecuteQuerySegmentedAsync(new TableQuery<DiagnosticEvent>(), null);
// Skip tables that have 0 records
if (tableRecords.Results.Count == 0)
{
continue;
}
// Delete table if it doesn't have records with EventVersion
var eventVersionDoesNotExists = tableRecords.Results.Any(record => string.IsNullOrEmpty(record.EventVersion) == true);
if (eventVersionDoesNotExists)
{
_logger.LogDebug("Deleting table '{tableName}' as it contains records without an EventVersion.", table.Name);
await table.DeleteIfExistsAsync();
tableDeleted = true;
continue;
}
var tableQuery = table.QueryAsync<DiagnosticEvent>(cancellationToken: default);
// If the table does have EventVersion, query if it is an outdated version
var eventVersionOutdated = tableRecords.Results.Any(record => string.Compare(DiagnosticEvent.CurrentEventVersion, record.EventVersion, StringComparison.Ordinal) > 0);
if (eventVersionOutdated)
await foreach (var record in tableQuery)
{
_logger.LogDebug("Deleting table '{tableName}' as it contains records with an outdated EventVersion.", table.Name);
await table.DeleteIfExistsAsync();
tableDeleted = true;
// Delete table if it doesn't have records with EventVersion
if (string.IsNullOrEmpty(record.EventVersion) == true)
{
_logger.LogDebug("Deleting table '{tableName}' as it contains records without an EventVersion.", table.Name);
await table.DeleteAsync();
tableDeleted = true;
break;
}
// If the table does have EventVersion, query if it is an outdated version
if (string.Compare(DiagnosticEvent.CurrentEventVersion, record.EventVersion, StringComparison.Ordinal) > 0)
{
_logger.LogDebug("Deleting table '{tableName}' as it contains records with an outdated EventVersion.", table.Name);
await table.DeleteAsync();
tableDeleted = true;
break;
}
}
}
Expand All @@ -177,7 +172,7 @@ private async Task PurgePreviousEventVersions()
}
}

internal virtual async Task FlushLogs(CloudTable table = null)
internal virtual async Task FlushLogs(TableClient table = null)
{
if (_environment.IsPlaceholderModeEnabled())
{
Expand All @@ -200,7 +195,7 @@ internal virtual async Task FlushLogs(CloudTable table = null)
return;
}

bool tableCreated = await TableStorageHelpers.CreateIfNotExistsAsync(table, TableCreationMaxRetryCount);
bool tableCreated = await TableStorageHelpers.CreateIfNotExistsAsync(table, TableClient, TableCreationMaxRetryCount);
if (tableCreated)
{
_logger.LogDebug("Queueing background table purge.");
Expand All @@ -225,20 +220,20 @@ internal virtual async Task FlushLogs(CloudTable table = null)
}
}

internal async Task ExecuteBatchAsync(ConcurrentDictionary<string, DiagnosticEvent> events, CloudTable table)
internal async Task ExecuteBatchAsync(ConcurrentDictionary<string, DiagnosticEvent> events, TableClient table)
{
try
{
var batch = new TableBatchOperation();
var batch = new List<TableTransactionAction>();
foreach (string errorCode in events.Keys)
{
var diagnosticEvent = events[errorCode];
diagnosticEvent.Message = Sanitizer.Sanitize(diagnosticEvent.Message);
diagnosticEvent.Details = Sanitizer.Sanitize(diagnosticEvent.Details);
TableOperation insertOperation = TableOperation.Insert(diagnosticEvent);
batch.Add(insertOperation);
TableTransactionAction insertAction = new TableTransactionAction(TableTransactionActionType.Add, diagnosticEvent);
batch.Add(insertAction);
}
await table.ExecuteBatchAsync(batch);
await table.SubmitTransactionAsync(batch);
events.Clear();
}
catch (Exception ex)
Expand Down
49 changes: 31 additions & 18 deletions src/WebJobs.Script.WebHost/Helpers/TableStorageHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Table;
using Azure;
using Azure.Data.Tables;
using Azure.Data.Tables.Models;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Script.WebHost.Helpers
Expand All @@ -18,27 +20,27 @@ internal static string GetRowKey(DateTime now)
return string.Format("{0:D19}-{1}", DateTime.MaxValue.Ticks - now.Ticks, Guid.NewGuid());
}

internal static async Task<bool> CreateIfNotExistsAsync(CloudTable table, int tableCreationRetries, int retryDelayMS = 1000)
internal static async Task<bool> CreateIfNotExistsAsync(TableClient table, TableServiceClient tableClient, int tableCreationRetries, int retryDelayMS = 1000)
{
int attempt = 0;
do
{
try
{
if (!table.Exists())
if (!await TableExistAsync(table, tableClient))
{
return await table.CreateIfNotExistsAsync();
return (await table.CreateIfNotExistsAsync())?.Value is not null;
}
}
catch (StorageException e)
catch (RequestFailedException rfe)
{
// Can get conflicts with multiple instances attempting to create
// the same table.
// Also, if a table queued up for deletion, we can get a conflict on create,
// though these should only happen in tests not production, because we only ever
// delete OLD tables and we'll never be attempting to recreate a table we just
// deleted outside of tests.
if (e.RequestInformation.HttpStatusCode == (int)HttpStatusCode.Conflict &&
if ((rfe.Status != (int)HttpStatusCode.Conflict || rfe.ErrorCode == TableErrorCode.TableBeingDeleted) &&
attempt < tableCreationRetries)
{
// wait a bit and try again
Expand All @@ -53,7 +55,7 @@ internal static async Task<bool> CreateIfNotExistsAsync(CloudTable table, int ta
return false;
}

internal static void QueueBackgroundTablePurge(CloudTable currentTable, CloudTableClient tableClient, string tableNamePrefix, ILogger logger, int delaySeconds = 30)
internal static void QueueBackgroundTablePurge(TableClient currentTable, TableServiceClient tableClient, string tableNamePrefix, ILogger logger, int delaySeconds = 30)
{
var tIgnore = Task.Run(async () =>
{
Expand All @@ -74,38 +76,49 @@ internal static void QueueBackgroundTablePurge(CloudTable currentTable, CloudTab
});
}

internal static async Task DeleteOldTablesAsync(CloudTable currentTable, CloudTableClient tableClient, string tableNamePrefix, ILogger logger)
internal static async Task DeleteOldTablesAsync(TableClient currentTable, TableServiceClient tableClient, string tableNamePrefix, ILogger logger)
{
var tablesToDelete = await ListOldTablesAsync(currentTable, tableClient, tableNamePrefix);
logger.LogDebug($"Deleting {tablesToDelete.Count()} old tables.");
foreach (var table in tablesToDelete)
{
logger.LogDebug($"Deleting table '{table.Name}'");
await table.DeleteIfExistsAsync();
await tableClient.DeleteTableAsync(table.Name);
logger.LogDebug($"{table.Name} deleted.");
}
}

internal static async Task<IEnumerable<CloudTable>> ListOldTablesAsync(CloudTable currentTable, CloudTableClient tableClient, string tableNamePrefix)
internal static async Task<IEnumerable<TableClient>> ListOldTablesAsync(TableClient currentTable, TableServiceClient tableClient, string tableNamePrefix)
{
var tables = await ListTablesAsync(tableClient, tableNamePrefix);
return tables.Where(p => !string.Equals(currentTable.Name, p.Name, StringComparison.OrdinalIgnoreCase));
}

internal static async Task<IEnumerable<CloudTable>> ListTablesAsync(CloudTableClient tableClient, string tableNamePrefix)
internal static async Task<IEnumerable<TableClient>> ListTablesAsync(TableServiceClient tableClient, string tableNamePrefix)
{
List<CloudTable> tables = new List<CloudTable>();
TableContinuationToken continuationToken = null;
// Azure.Data.Tables doesn't have a direct way to list tables with a prefix so we need to do it manually
var givenValue = tableNamePrefix + "{";
AsyncPageable<TableItem> tablesQuery = tableClient.QueryAsync(p => p.Name.CompareTo(tableNamePrefix) >= 0 && p.Name.CompareTo(givenValue) <= 0);
var tables = new List<TableClient>();

do
await foreach (var table in tablesQuery)
{
var results = await tableClient.ListTablesSegmentedAsync(tableNamePrefix, continuationToken);
continuationToken = results.ContinuationToken;
tables.AddRange(results.Results);
tables.Add(tableClient.GetTableClient(table.Name));
}
while (continuationToken != null);

return tables;
}

internal static async Task<bool> TableExistAsync(TableClient table, TableServiceClient tableClient)
{
var query = tableClient.QueryAsync(p => p.Name == table.Name);

await foreach (var item in query)
{
return true;
}

return false;
}
}
}
1 change: 1 addition & 0 deletions src/WebJobs.Script.WebHost/WebJobs.Script.WebHost.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Data.Tables" Version="12.8.3" />
<PackageReference Include="Azure.Identity" Version="1.11.4" />
<PackageReference Include="Azure.Security.KeyVault.Secrets" Version="4.2.0" />
<PackageReference Include="Microsoft.ApplicationInsights" Version="2.22.0" />
Expand Down
4 changes: 4 additions & 0 deletions src/WebJobs.Script/runtimeassemblies.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
"name": "Azure.Core",
"resolutionPolicy": "private"
},
{
"name": "Azure.Data.Tables",
"resolutionPolicy": "private"
},
{
"name": "Azure.Identity",
"resolutionPolicy": "private"
Expand Down
Loading

0 comments on commit 8d4e9df

Please sign in to comment.