Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal] ClientTelemetry: Adds logic to limit payload size to 2 MB #3717

Merged
merged 11 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 31 additions & 153 deletions Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ namespace Microsoft.Azure.Cosmos.Telemetry
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Net;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Handler;
Expand All @@ -20,9 +18,6 @@ namespace Microsoft.Azure.Cosmos.Telemetry
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Cosmos.Tracing.TraceData;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Collections;
using Microsoft.Azure.Documents.Rntbd;
using Newtonsoft.Json;
using Util;
using static Microsoft.Azure.Cosmos.Tracing.TraceData.ClientSideRequestStatisticsTraceDatum;

Expand All @@ -36,12 +31,10 @@ internal class ClientTelemetry : IDisposable
{
private const int allowedNumberOfFailures = 3;

private static readonly Uri endpointUrl = ClientTelemetryOptions.GetClientTelemetryEndpoint();
private static readonly TimeSpan observingWindow = ClientTelemetryOptions.GetScheduledTimeSpan();

private readonly ClientTelemetryProperties clientTelemetryInfo;
private readonly CosmosHttpClient httpClient;
private readonly AuthorizationTokenProvider tokenProvider;
private readonly ClientTelemetryProcessor processor;
private readonly DiagnosticsHandlerHelper diagnosticsHelper;

private readonly CancellationTokenSource cancellationTokenSource;
Expand Down Expand Up @@ -108,7 +101,7 @@ internal ClientTelemetry()
return clientTelemetry;
}

private ClientTelemetry(
internal ClientTelemetry(
string clientId,
CosmosHttpClient httpClient,
string userAgent,
Expand All @@ -118,9 +111,8 @@ internal ClientTelemetry()
IReadOnlyList<string> preferredRegions,
GlobalEndpointManager globalEndpointManager)
{
this.httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient));
this.diagnosticsHelper = diagnosticsHelper ?? throw new ArgumentNullException(nameof(diagnosticsHelper));
this.tokenProvider = authorizationTokenProvider ?? throw new ArgumentNullException(nameof(authorizationTokenProvider));
this.processor = new ClientTelemetryProcessor(httpClient, authorizationTokenProvider);

this.clientTelemetryInfo = new ClientTelemetryProperties(
clientId: clientId,
Expand Down Expand Up @@ -170,39 +162,45 @@ private async Task EnrichAndSendAsync()

await Task.Delay(observingWindow, this.cancellationTokenSource.Token);

this.clientTelemetryInfo.DateTimeUtc = DateTime.UtcNow.ToString(ClientTelemetryOptions.DateFormat);
this.clientTelemetryInfo.MachineId = VmMetadataApiHandler.GetMachineId();

// Load host information from cache
Compute vmInformation = VmMetadataApiHandler.GetMachineInfo();
this.clientTelemetryInfo.ApplicationRegion = vmInformation?.Location;
this.clientTelemetryInfo.HostEnvInfo = ClientTelemetryOptions.GetHostInformation(vmInformation);

// If cancellation is requested after the delay then return from here.
if (this.cancellationTokenSource.IsCancellationRequested)
{
DefaultTrace.TraceInformation("Observer Task Cancelled.");

break;
}

this.RecordSystemUtilization();
this.clientTelemetryInfo.SystemInfo = ClientTelemetryHelper.RecordSystemUtilization(this.diagnosticsHelper,
this.clientTelemetryInfo.IsDirectConnectionMode);

this.clientTelemetryInfo.DateTimeUtc = DateTime.UtcNow.ToString(ClientTelemetryOptions.DateFormat);

ConcurrentDictionary<OperationInfo, (LongConcurrentHistogram latency, LongConcurrentHistogram requestcharge)> operationInfoSnapshot
// Take the copy for further processing i.e. serializing and dividing into chunks
ConcurrentDictionary<OperationInfo, (LongConcurrentHistogram latency, LongConcurrentHistogram requestcharge)> operationInfoSnapshot
= Interlocked.Exchange(ref this.operationInfoMap, new ConcurrentDictionary<OperationInfo, (LongConcurrentHistogram latency, LongConcurrentHistogram requestcharge)>());

ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram> cacheRefreshInfoSnapshot
= Interlocked.Exchange(ref this.cacheRefreshInfoMap, new ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram>());
= Interlocked.Exchange(ref this.cacheRefreshInfoMap, new ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram>());

ConcurrentDictionary<RequestInfo, LongConcurrentHistogram> requestInfoSnapshot
= Interlocked.Exchange(ref this.requestInfoMap, new ConcurrentDictionary<RequestInfo, LongConcurrentHistogram>());

this.clientTelemetryInfo.OperationInfo = ClientTelemetryHelper.ToListWithMetricsInfo(operationInfoSnapshot);
this.clientTelemetryInfo.CacheRefreshInfo = ClientTelemetryHelper.ToListWithMetricsInfo(cacheRefreshInfoSnapshot);
this.clientTelemetryInfo.RequestInfo = ClientTelemetryHelper.ToListWithMetricsInfo(requestInfoSnapshot);

await this.SendAsync();
try
{
await this.processor
.ProcessAndSendAsync(
clientTelemetryInfo: this.clientTelemetryInfo,
operationInfoSnapshot: operationInfoSnapshot,
cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot,
requestInfoSnapshot: requestInfoSnapshot,
cancellationToken: this.cancellationTokenSource.Token);

this.numberOfFailures = 0;
sourabh1007 marked this conversation as resolved.
Show resolved Hide resolved
}
catch (Exception ex)
{
this.numberOfFailures++;

DefaultTrace.TraceError("Telemetry Job Processor failed with error : {0}", ex);
}
}
}
catch (Exception ex)
Expand Down Expand Up @@ -233,8 +231,8 @@ private async Task EnrichAndSendAsync()
throw new ArgumentNullException(nameof(cacheRefreshSource));
}

DefaultTrace.TraceVerbose($"Collecting cacheRefreshSource {cacheRefreshSource} data for Telemetry.");
DefaultTrace.TraceVerbose($"Collecting cacheRefreshSource {0} data for Telemetry.", cacheRefreshSource);

string regionsContacted = ClientTelemetryHelper.GetContactedRegions(regionsContactedList);

// Recording Request Latency
Expand Down Expand Up @@ -290,7 +288,7 @@ private async Task EnrichAndSendAsync()
ITrace trace)
{
DefaultTrace.TraceVerbose("Collecting Operation data for Telemetry.");

if (cosmosDiagnostics == null)
{
throw new ArgumentNullException(nameof(cosmosDiagnostics));
Expand All @@ -301,7 +299,7 @@ private async Task EnrichAndSendAsync()
this.RecordRntbdResponses(containerId, databaseId, summaryDiagnostics.StoreResponseStatistics.Value);

string regionsContacted = ClientTelemetryHelper.GetContactedRegions(cosmosDiagnostics.GetContactedRegions());

// Recording Request Latency and Request Charge
OperationInfo payloadKey = new OperationInfo(regionsContacted: regionsContacted?.ToString(),
responseSizeInBytes: responseSizeInBytes,
Expand Down Expand Up @@ -370,126 +368,6 @@ private void RecordRntbdResponses(string containerId, string databaseId, List<St
}
}
}

/// <summary>
/// Record CPU and memory usage which will be sent as part of telemetry information
/// </summary>
private void RecordSystemUtilization()
{
try
{
DefaultTrace.TraceVerbose("Started Recording System Usage for telemetry.");

SystemUsageHistory systemUsageHistory = this.diagnosticsHelper.GetClientTelemetrySystemHistory();

if (systemUsageHistory != null )
{
ClientTelemetryHelper.RecordSystemUsage(
systemUsageHistory: systemUsageHistory,
systemInfoCollection: this.clientTelemetryInfo.SystemInfo,
isDirectConnectionMode: this.clientTelemetryInfo.IsDirectConnectionMode);
}
else
{
DefaultTrace.TraceWarning("System Usage History not available");
}
}
catch (Exception ex)
{
DefaultTrace.TraceError("System Usage Recording Error : {0} ", ex);
}
}

/// <summary>
/// Task to send telemetry information to configured Juno endpoint.
/// If endpoint is not configured then it won't even try to send information. It will just trace an error message.
/// In any case it resets the telemetry information to collect the latest one.
/// </summary>
/// <returns>Async Task</returns>
private async Task SendAsync()
{
if (endpointUrl == null)
{
DefaultTrace.TraceError("Telemetry is enabled but endpoint is not configured");
return;
}

try
{
DefaultTrace.TraceInformation("Sending Telemetry Data to {0}", endpointUrl.AbsoluteUri);

string json = JsonConvert.SerializeObject(this.clientTelemetryInfo, ClientTelemetryOptions.JsonSerializerSettings);

using HttpRequestMessage request = new HttpRequestMessage
{
Method = HttpMethod.Post,
RequestUri = endpointUrl,
Content = new StringContent(json, Encoding.UTF8, "application/json")
};

async ValueTask<HttpRequestMessage> CreateRequestMessage()
{
INameValueCollection headersCollection = new StoreResponseNameValueCollection();
await this.tokenProvider.AddAuthorizationHeaderAsync(
headersCollection,
endpointUrl,
"POST",
AuthorizationTokenType.PrimaryMasterKey);

foreach (string key in headersCollection.AllKeys())
{
request.Headers.Add(key, headersCollection[key]);
}

request.Headers.Add(HttpConstants.HttpHeaders.DatabaseAccountName, this.clientTelemetryInfo.GlobalDatabaseAccountName);
String envName = ClientTelemetryOptions.GetEnvironmentName();
if (!String.IsNullOrEmpty(envName))
{
request.Headers.Add(HttpConstants.HttpHeaders.EnvironmentName, envName);
}

return request;
}

using HttpResponseMessage response = await this.httpClient.SendHttpAsync(CreateRequestMessage,
ResourceType.Telemetry,
HttpTimeoutPolicyNoRetry.Instance,
null,
this.cancellationTokenSource.Token);

if (!response.IsSuccessStatusCode)
{
this.numberOfFailures++;

DefaultTrace.TraceError("Juno API response not successful. Status Code : {0}, Message : {1}", response.StatusCode, response.ReasonPhrase);
}
else
{
this.numberOfFailures = 0; // Ressetting failure counts on success call.
DefaultTrace.TraceInformation("Telemetry data sent successfully.");
}

}
catch (Exception ex)
{
this.numberOfFailures++;

DefaultTrace.TraceError("Exception while sending telemetry data : {0}", ex);
}
finally
{
// Reset SystemInfo Dictionary for new data.
this.Reset();
}
}

/// <summary>
/// Reset all the operation, System Utilization and Cache refresh related collections
/// </summary>
private void Reset()
{
this.clientTelemetryInfo.SystemInfo.Clear();
}

/// <summary>
/// Dispose of cosmos client.It will get disposed with client so not making it thread safe.
Expand Down
Loading