Skip to content

Commit

Permalink
Merge pull request #68 from AzureCosmosDB/develop
Browse files Browse the repository at this point in the history
Minor fixes
  • Loading branch information
codingbandit committed Jun 17, 2023
2 parents 681c9a4 + 5c73141 commit c8ad5a8
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 69 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
namespace Cosmos.DataTransfer.CosmosExtension.UnitTests
using Cosmos.DataTransfer.Interfaces;

namespace Cosmos.DataTransfer.CosmosExtension.UnitTests
{
[TestClass]
public class CosmosDataSinkExtensionTests
{
[TestMethod]
public void BuildObject_WithNestedArrays_WorksCorrectly()
public void BuildDynamicObjectTree_WithNestedArrays_WorksCorrectly()
{
var item = new CosmosDictionaryDataItem(new Dictionary<string, object?>()
{
Expand Down Expand Up @@ -34,7 +36,7 @@ public void BuildObject_WithNestedArrays_WorksCorrectly()
}
});

dynamic obj = CosmosDataSinkExtension.BuildObject(item)!;
dynamic obj = item.BuildDynamicObjectTree()!;

Assert.AreEqual(typeof(object[]), obj.array.GetType());
Assert.AreEqual(2, obj.array.Length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@
<OutputType>Exe</OutputType>
</PropertyGroup>

<ItemGroup>
<InternalsVisibleTo Include="Cosmos.DataTransfer.CosmosExtension.UnitTests" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Identity" Version="1.6.0" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.34.0" />
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
using System.ComponentModel.Composition;
using System.Diagnostics;
using System.Dynamic;
using System.Globalization;
using System.Reflection;
using System.Text;
using System.Text.RegularExpressions;
using Azure.Identity;
using Cosmos.DataTransfer.Interfaces;
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Configuration;
Expand Down Expand Up @@ -95,7 +92,7 @@ void ReportCount(int i)
}
}

var convertedObjects = dataItems.Select(di => BuildObject(di, true)).Where(o => o != null).OfType<ExpandoObject>();
var convertedObjects = dataItems.Select(di => di.BuildDynamicObjectTree(true)).Where(o => o != null).OfType<ExpandoObject>();
var batches = convertedObjects.Buffer(settings.BatchSize);
var retry = GetRetryPolicy(settings.MaxRetryCount, settings.InitialRetryDurationMs);
await foreach (var batch in batches.WithCancellation(cancellationToken))
Expand Down Expand Up @@ -176,57 +173,6 @@ private static MemoryStream CreateItemStream(ExpandoObject item)
return ((IDictionary<string, object?>)item)[propertyName]?.ToString();
}

internal static ExpandoObject? BuildObject(IDataItem? source, bool requireStringId = false)
{
if (source == null)
return null;

var fields = source.GetFieldNames().ToList();
var item = new ExpandoObject();
if (requireStringId && !fields.Contains("id", StringComparer.CurrentCultureIgnoreCase))
{
item.TryAdd("id", Guid.NewGuid().ToString());
}
foreach (string field in fields)
{
object? value = source.GetValue(field);
var fieldName = field;
if (string.Equals(field, "id", StringComparison.CurrentCultureIgnoreCase) && requireStringId)
{
value = value?.ToString();
fieldName = "id";
}
else if (value is IDataItem child)
{
value = BuildObject(child);
}
else if (value is IEnumerable<object?> array)
{
value = BuildArray(array);
}

item.TryAdd(fieldName, value);
}

return item;

static object BuildArray(IEnumerable<object?> array)
{
return array.Select(dataItem =>
{
if (dataItem is IDataItem childObject)
{
return BuildObject(childObject);
}
else if (dataItem is IEnumerable<object?> array)
{
return BuildArray(array);
}
return dataItem;
}).ToArray();
}
}

public IEnumerable<IDataExtensionSettings> GetSettings()
{
yield return new CosmosSinkSettings();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Globalization;
using System.Reflection;
using Azure.Core;
using System.Text.RegularExpressions;

namespace Cosmos.DataTransfer.CosmosExtension
{
Expand Down Expand Up @@ -44,16 +45,22 @@ private static string CreateUserAgentString(string displayName, string? sourceDi
// Assembly.GetExecutingAssembly().GetName().Version,
// context.SourceName, context.SinkName,
// isShardedImport ? Resources.ShardedImportDesignator : String.Empty)
string sourceName = StripSpecialChars(sourceDisplayName ?? "");
string sinkName = StripSpecialChars(displayName);

var entryAssembly = Assembly.GetEntryAssembly();
bool isShardedImport = false;
string userAgentString = string.Format(CultureInfo.InvariantCulture, "{0}-{1}-{2}-{3}{4}",
entryAssembly == null ? "dtr" : entryAssembly.GetName().Name,
Assembly.GetExecutingAssembly().GetName().Version,
sourceDisplayName, displayName,
sourceName, sinkName,
isShardedImport ? "-Sharded" : string.Empty);
return userAgentString;
}
private static string StripSpecialChars(string displayName)
{
return Regex.Replace(displayName, "[^\\w]", "", RegexOptions.Compiled);
}

public static async Task VerifyContainerAccess(Container? container, string? name, ILogger logger, CancellationToken cancellationToken)
{
Expand Down
4 changes: 2 additions & 2 deletions Extensions/Cosmos/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ The Cosmos data transfer extension provides source and sink capabilities for rea

Source and sink require settings used to locate and access the Cosmos DB account. This can be done in one of two ways:
- Using a `ConnectionString` that includes an AccountEndpoint and AccountKey
- Using RBAC (Role Based Access Control) by setting `UseRbac` to true and specifying `AccountEndpoint` and optionally `EnableInteractiveCredentials` to prompt the user to log in to Azure if default credentials are not available.
- Using RBAC (Role Based Access Control) by setting `UseRbacAuth` to true and specifying `AccountEndpoint` and optionally `EnableInteractiveCredentials` to prompt the user to log in to Azure if default credentials are not available.

Source and sink settings also both require parameters to specify the data location within a Cosmos DB account:
- `Database`
Expand All @@ -33,7 +33,7 @@ Or with RBAC:

```json
{
"UseRbac": true,
"UseRbacAuth": true,
"AccountEndpoint": "https://...",
"EnableInteractiveCredentials": true,
"Database":"myDb",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public IEnumerable<string> GetFieldNames()
JsonValueKind kind = element.ValueKind;
switch (kind)
{
case JsonValueKind.Null:
return null;
case JsonValueKind.String:
return element.GetString();
case JsonValueKind.Number:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@ public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfigurati
var batchSize = settings.BatchSize ?? 1000;

var objects = new List<BsonDocument>();
await foreach (var item in dataItems)
int itemCount = 0;
await foreach (var item in dataItems.WithCancellation(cancellationToken))
{
var dict = item.GetFieldNames().ToDictionary(key => key, key => item.GetValue(key));
var dict = item.BuildDynamicObjectTree();
objects.Add(new BsonDocument(dict));
itemCount++;

if (objects.Count == batchSize)
{
await repo.AddRange(objects);
logger.LogInformation("Added {ItemCount} items to collection '{Collection}'", itemCount, settings.Collection);
objects.Clear();
}
}
Expand All @@ -40,6 +43,11 @@ await foreach (var item in dataItems)
{
await repo.AddRange(objects);
}

if (itemCount > 0)
logger.LogInformation("Added {ItemCount} total items to collection '{Collection}'", itemCount, settings.Collection);
else
logger.LogWarning("No items added to collection '{Collection}'", settings.Collection);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,28 @@ public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogge

foreach (var collection in collectionNames)
{
await foreach (var item in EnumerateCollectionAsync(context, collection))
await foreach (var item in EnumerateCollectionAsync(context, collection, logger).WithCancellation(cancellationToken))
{
yield return item;
}
}
}
}

public async IAsyncEnumerable<IDataItem> EnumerateCollectionAsync(Context context, string collectionName)
public async IAsyncEnumerable<IDataItem> EnumerateCollectionAsync(Context context, string collectionName, ILogger logger)

Check warning on line 38 in Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs

View workflow job for this annotation

GitHub Actions / Build and test .NET projects

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.
{
logger.LogInformation("Reading collection '{Collection}'", collectionName);
var collection = context.GetRepository<BsonDocument>(collectionName);
int itemCount = 0;
foreach (var record in collection.AsQueryable())
{
yield return new MongoDataItem(record);
itemCount++;
}
if (itemCount > 0)
logger.LogInformation("Read {ItemCount} items from collection '{Collection}'", itemCount, collectionName);
else
logger.LogWarning("No items read from collection '{Collection}'", collectionName);
}

public IEnumerable<IDataExtensionSettings> GetSettings()
Expand Down
64 changes: 64 additions & 0 deletions Interfaces/Cosmos.DataTransfer.Interfaces/DataItemExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using System.Dynamic;

namespace Cosmos.DataTransfer.Interfaces;

public static class DataItemExtensions
{
/// <summary>
/// Given a source IDataItem, builds a dynamic object tree including child objects and arrays
/// </summary>
/// <param name="source"></param>
/// <param name="requireStringId">If true, adds a new GUID "id" field to any top level items where one is not already present.</param>
/// <returns>A dynamic object containing the entire data structure.</returns>
/// <remarks>The returned ExpandoObject can be used directly as an IDictionary.</remarks>
public static ExpandoObject? BuildDynamicObjectTree(this IDataItem? source, bool requireStringId = false)
{
if (source == null)
return null;

var fields = source.GetFieldNames().ToList();
var item = new ExpandoObject();
if (requireStringId && !fields.Contains("id", StringComparer.CurrentCultureIgnoreCase))
{
item.TryAdd("id", Guid.NewGuid().ToString());
}
foreach (string field in fields)
{
object? value = source.GetValue(field);
var fieldName = field;
if (string.Equals(field, "id", StringComparison.CurrentCultureIgnoreCase) && requireStringId)
{
value = value?.ToString();
fieldName = "id";
}
else if (value is IDataItem child)
{
value = BuildDynamicObjectTree(child);
}
else if (value is IEnumerable<object?> array)
{
value = BuildArray(array);
}

item.TryAdd(fieldName, value);
}

return item;

static object BuildArray(IEnumerable<object?> array)
{
return array.Select(dataItem =>
{
switch (dataItem)
{
case IDataItem childObject:
return BuildDynamicObjectTree(childObject);
case IEnumerable<object?> array:
return BuildArray(array);
default:
return dataItem;
}
}).ToArray();
}
}
}

0 comments on commit c8ad5a8

Please sign in to comment.