Skip to content

Commit

Permalink
fix test and logic
Browse files Browse the repository at this point in the history
  • Loading branch information
sourabh1007 committed Feb 24, 2023
1 parent 1677ade commit 6bf1929
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ internal static class ClientTelemetryOptions
internal static readonly HashSet<string> PropertiesContainMetrics = new HashSet<string>(StringComparer.OrdinalIgnoreCase) { "OperationInfo", "CacheRefreshInfo" };

internal static int PayloadSizeThreshold = 1024 * 1024 * 2; // 2MB
internal static Dictionary<string, int> PropertiesWithPageSize = new Dictionary<string, int>
{
{ "OperationInfo", 1000 },
{ "CacheRefreshInfo", 2000 }
};

private static Uri clientTelemetryEndpoint;
private static string environmentName;
Expand Down
81 changes: 43 additions & 38 deletions Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Cosmos.Telemetry
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Threading;
Expand Down Expand Up @@ -57,50 +58,44 @@ internal ClientTelemetryProcessor(CosmosHttpClient httpClient, AuthorizationToke
ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram> cacheRefreshInfoSnapshot,
CancellationToken cancellationToken)
{
List<OperationInfo> opInfolist = ClientTelemetryHelper.ToListWithMetricsInfo(operationInfoSnapshot);
List<CacheRefreshInfo> cacheRefreshList = ClientTelemetryHelper.ToListWithMetricsInfo(cacheRefreshInfoSnapshot);
clientTelemetryInfo.OperationInfo = ClientTelemetryHelper.ToListWithMetricsInfo(operationInfoSnapshot);
clientTelemetryInfo.CacheRefreshInfo = ClientTelemetryHelper.ToListWithMetricsInfo(cacheRefreshInfoSnapshot);

JsonSerializerSettings settings = ClientTelemetryOptions.JsonSerializerSettings;
string json = JsonConvert.SerializeObject(clientTelemetryInfo, settings);

// if JSON is greater than 2 MB
// If JSON is greater than 2 MB
if (json.Length > ClientTelemetryOptions.PayloadSizeThreshold)
{
foreach (string property in ClientTelemetryOptions.PropertiesContainMetrics)
{
settings.ContractResolver = new IncludePropertyContractResolver(property);
json = JsonConvert.SerializeObject(clientTelemetryInfo, settings);

//If this property is contributing to make payload size greater than 2 MB, then break it further
if (json.Length > ClientTelemetryOptions.PayloadSizeThreshold)
clientTelemetryInfo.OperationInfo = null; //clearing everything
clientTelemetryInfo.CacheRefreshInfo = null;

if (property == "OperationInfo")
{
int numberOfChunks = (json.Length / ClientTelemetryOptions.PayloadSizeThreshold) + 2; // adding one extra chunk to be at safer side and incorporate other account information
List<Dictionary<OperationInfo, (LongConcurrentHistogram latency, LongConcurrentHistogram requestcharge)>> pages
= this.GetPages<OperationInfo, (LongConcurrentHistogram latency, LongConcurrentHistogram requestcharge)>(operationInfoSnapshot, property);

if (property == "OperationInfo")
foreach (var page in pages)
{
List<List<OperationInfo>> chunks = this.BreakIntoChunks<OperationInfo>(operationInfos, numberOfChunks);
List<OperationInfo> oplist = ClientTelemetryHelper.ToListWithMetricsInfo(page);
clientTelemetryInfo.OperationInfo = oplist;

foreach (List<OperationInfo> chunk in chunks)
{
clientTelemetryInfo.OperationInfo = chunk;
await this.SendAsync(clientTelemetryInfo, cancellationToken);
}
await this.SendAsync(clientTelemetryInfo, cancellationToken);
}
else if (property == "CacheRefreshInfo")
}
else if (property == "CacheRefreshInfo")
{
List<Dictionary<CacheRefreshInfo, LongConcurrentHistogram>> pages = this.GetPages<CacheRefreshInfo, LongConcurrentHistogram>(cacheRefreshInfoSnapshot, property);
foreach (var page in pages)
{
List<List<CacheRefreshInfo>> chunks = this.BreakIntoChunks<CacheRefreshInfo>(cacheRefreshInfos, numberOfChunks);
List<CacheRefreshInfo> crList = ClientTelemetryHelper.ToListWithMetricsInfo(page);
clientTelemetryInfo.CacheRefreshInfo = crList;

foreach (List<CacheRefreshInfo> chunk in chunks)
{
clientTelemetryInfo.CacheRefreshInfo = chunk;
await this.SendAsync(clientTelemetryInfo, cancellationToken);
}
await this.SendAsync(clientTelemetryInfo, cancellationToken);
}
}
else
{
await this.SendAsync(clientTelemetryInfo.GlobalDatabaseAccountName, json, cancellationToken);
}
}
}
else
Expand All @@ -109,22 +104,32 @@ internal ClientTelemetryProcessor(CosmosHttpClient httpClient, AuthorizationToke
}
}

private List<List<T>> BreakIntoChunks<T>(List<T> infoSnapshot, int numberOfChunks)
private List<Dictionary<T, V>> GetPages<T, V>(ConcurrentDictionary<T, V> operationInfoSnapshot, string property)
{
if (infoSnapshot == null || infoSnapshot.Count == 0)
{
return new List<List<T>>();
}
List<List<T>> listofChunks = new List<List<T>>();
int lengthOfEachChunk = infoSnapshot.Count / numberOfChunks;
for (int i = 0; i < numberOfChunks; i++)
var list = operationInfoSnapshot.ToList();
int totalItems = list.Count();

int pageSize = ClientTelemetryOptions.PropertiesWithPageSize[property];
int totalPages = (int)Math.Ceiling((double)totalItems / pageSize);

List<Dictionary<T, V>> pages = new List<Dictionary<T, V>>();
for (int i = 1; i <= totalPages; i++)
{
listofChunks.Add(infoSnapshot.GetRange(i * lengthOfEachChunk, lengthOfEachChunk));
pages.Add(list.GetRange((i - 1) * pageSize, Math.Min(pageSize, totalItems)) // get current page items
.ToDictionary(k => k.Key, v => v.Value)); // convert to dictionary

totalItems -= pageSize; // update remaining items
}
return listofChunks;

return pages;
}

/// <summary>
/// Task to send telemetry information to configured Juno endpoint.
/// If endpoint is not configured then it won't even try to send information. It will just trace an error message.
/// In any case it resets the telemetry information to collect the latest one.
/// </summary>
/// <returns>Async Task</returns>
private async Task SendAsync(ClientTelemetryProperties clientTelemetryInfo, CancellationToken cancellationToken)
{
string jsonPayload = JsonConvert.SerializeObject(clientTelemetryInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public void TestInitialize()
HttpResponseMessage result = new HttpResponseMessage(HttpStatusCode.OK);
string jsonObject = request.Content.ReadAsStringAsync().GetAwaiter().GetResult();
Assert.IsTrue(jsonObject.Length < ClientTelemetryOptions.PayloadSizeThreshold, "json size should be less than 2MB");
lock (this.actualInfo)
{
this.actualInfo.Add(JsonConvert.DeserializeObject<ClientTelemetryProperties>(jsonObject));
Expand Down Expand Up @@ -101,7 +102,8 @@ public void TestInitialize()
HttpResponseMessage result = new HttpResponseMessage(HttpStatusCode.OK);
string jsonObject = request.Content.ReadAsStringAsync().GetAwaiter().GetResult();
Assert.IsTrue(jsonObject.Length < ClientTelemetryOptions.PayloadSizeThreshold, "json size should be less than 2MB");
lock (this.actualInfo)
{
this.actualInfo.Add(JsonConvert.DeserializeObject<ClientTelemetryProperties>(jsonObject));
Expand Down Expand Up @@ -676,7 +678,7 @@ public async Task CreateItemWithSubStatusCodeTest(ConnectionMode mode)
HttpResponseMessage result = new HttpResponseMessage(HttpStatusCode.OK);
string jsonObject = request.Content.ReadAsStringAsync().GetAwaiter().GetResult();
Assert.IsTrue(jsonObject.Length < ClientTelemetryOptions.PayloadSizeThreshold, "json size should be less than 2MB");
lock (this.actualInfo)
{
this.actualInfo.Add(JsonConvert.DeserializeObject<ClientTelemetryProperties>(jsonObject));
Expand Down Expand Up @@ -777,51 +779,7 @@ public async Task CheckMisconfiguredTelemetryEndpoint_should_stop_the_job()

Assert.AreEqual(3, retryCounter);
}

[TestMethod]
public async Task SplittingForLargeSizePayload()
{
// Overriding payload size value
ClientTelemetryOptions.PayloadSizeThreshold = 2000;

int splitcount = 0;
HttpClientHandlerHelper customHttpHandler = new HttpClientHandlerHelper
{
RequestCallBack = (request, cancellation) =>
{
if (request.RequestUri.AbsoluteUri.Equals(ClientTelemetryOptions.GetClientTelemetryEndpoint().AbsoluteUri))
{
splitcount++;
string jsonObject = request.Content.ReadAsStringAsync().GetAwaiter().GetResult();
ClientTelemetryProperties clientTelemetryProperties = JsonConvert.DeserializeObject<ClientTelemetryProperties>(jsonObject);
Assert.IsTrue(
clientTelemetryProperties.OperationInfo == null ||
clientTelemetryProperties.CacheRefreshInfo == null ||
(clientTelemetryProperties.OperationInfo.Count == 0 && clientTelemetryProperties.CacheRefreshInfo.Count == 0));
}
return null;
}
};

Container container = await this.CreateClientAndContainer(
mode: ConnectionMode.Direct,
isAzureInstance: true,
customHttpHandler: customHttpHandler);

ToDoActivity testItem = ToDoActivity.CreateRandomToDoActivity("MyTestPkValue");
ItemResponse<ToDoActivity> createResponse = await container.CreateItemAsync<ToDoActivity>(testItem);
ToDoActivity testItemCreated = createResponse.Resource;
// Read an Item
await container.ReadItemAsync<ToDoActivity>(testItem.id, new Cosmos.PartitionKey(testItem.id));

await Task.Delay(2000);

Assert.IsTrue(splitcount >= 2);
}


private static void ResetSystemUsageMonitor(bool isTelemetryEnabled)
{
ClientTelemetryTests.systemUsageMonitor?.Stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ namespace Microsoft.Azure.Cosmos.Tests.Telemetry
using System.Text;
using System.IO;
using Microsoft.Azure.Cosmos.Telemetry.Resolver;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
using System.Net.Http;
using Moq;
using System.Threading.Tasks;
using System.Threading;
using System.Net;
using Microsoft.Azure.Cosmos.Handler;
using System.Collections.Concurrent;
using System.Security.AccessControl;
using Microsoft.Azure.Documents;

/// <summary>
/// Tests for <see cref="ClientTelemetry"/>.
Expand Down Expand Up @@ -158,6 +158,11 @@ public async Task CheckIfPayloadIsDividedCorrectlyAsync()
{
Environment.SetEnvironmentVariable(ClientTelemetryOptions.EnvPropsClientTelemetryEndpoint, "http://dummy.telemetry.endpoint/");
ClientTelemetryOptions.PayloadSizeThreshold = 1024 * 5; //5 Kb
ClientTelemetryOptions.PropertiesWithPageSize = new Dictionary<string, int>
{
{ "OperationInfo", 50 },
{ "CacheRefreshInfo", 10 }
};

string data = File.ReadAllText("Telemetry/ClientTelemetryPayload-large.json", Encoding.UTF8);
ClientTelemetryProperties clientTelemetryProperties = JsonConvert.DeserializeObject<ClientTelemetryProperties>(data);
Expand Down Expand Up @@ -186,9 +191,9 @@ public async Task CheckIfPayloadIsDividedCorrectlyAsync()
ClientTelemetryProperties propertiesToSend = JsonConvert.DeserializeObject<ClientTelemetryProperties>(payloadJson);
Console.WriteLine("SystemInfo Size => " + propertiesToSend.SystemInfo.Count);
Console.WriteLine("OperationInfo Size => " + propertiesToSend.OperationInfo.Count);
Console.WriteLine("CacheRefreshInfo Size => " + propertiesToSend.CacheRefreshInfo.Count);
Console.WriteLine("SystemInfo Count => " + propertiesToSend.SystemInfo.Count ?? "null");
Console.WriteLine("OperationInfo Count => " + propertiesToSend.OperationInfo?.Count ?? "null");
Console.WriteLine("CacheRefreshInfo Count => " + propertiesToSend.CacheRefreshInfo?.Count ?? "null");
Console.WriteLine();
})
Expand All @@ -198,10 +203,61 @@ public async Task CheckIfPayloadIsDividedCorrectlyAsync()
MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(new HttpHandlerHelper(mockHttpHandler.Object))),
Mock.Of<AuthorizationTokenProvider>());

ConcurrentDictionary<OperationInfo, (LongConcurrentHistogram latency, LongConcurrentHistogram requestcharge)> operationInfoSnapshot
= new ConcurrentDictionary<OperationInfo, (LongConcurrentHistogram latency, LongConcurrentHistogram requestcharge)>();

for (int i = 0; i < 101; i++)
{
OperationInfo opeInfo = new OperationInfo(Regions.WestUS,
0,
Documents.ConsistencyLevel.Session.ToString(),
"databaseName" + i,
"containerName",
Documents.OperationType.Read,
Documents.ResourceType.Document,
200,
0);

LongConcurrentHistogram latency = new LongConcurrentHistogram(ClientTelemetryOptions.RequestLatencyMin,
ClientTelemetryOptions.RequestLatencyMax,
ClientTelemetryOptions.RequestLatencyPrecision);
latency.RecordValue(10l);

LongConcurrentHistogram requestcharge = new LongConcurrentHistogram(ClientTelemetryOptions.RequestChargeMin,
ClientTelemetryOptions.RequestChargeMax,
ClientTelemetryOptions.RequestChargePrecision);
requestcharge.RecordValue(11l);

operationInfoSnapshot.TryAdd(opeInfo, (latency, requestcharge));
}

ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram> cacheRefreshInfoSnapshot
= new ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram>();
for (int i = 0; i < 105; i++)
{
CacheRefreshInfo opeInfo = new CacheRefreshInfo(Regions.WestUS,
10,
Documents.ConsistencyLevel.Session.ToString(),
"databaseName" + i,
"containerName",
Documents.OperationType.Read,
Documents.ResourceType.Document,
200,
1002,
"dummycache") ;

LongConcurrentHistogram latency = new LongConcurrentHistogram(ClientTelemetryOptions.RequestLatencyMin,
ClientTelemetryOptions.RequestLatencyMax,
ClientTelemetryOptions.RequestLatencyPrecision);
latency.RecordValue(10l);

cacheRefreshInfoSnapshot.TryAdd(opeInfo, latency);
}

await processor.GenerateOptimalSizeOfPayloadAndSendAsync(
clientTelemetryProperties,
clientTelemetryProperties.OperationInfo,
clientTelemetryProperties.CacheRefreshInfo,
clientTelemetryProperties,
operationInfoSnapshot,
cacheRefreshInfoSnapshot,
new CancellationToken());
}

Expand Down

0 comments on commit 6bf1929

Please sign in to comment.