Skip to content

Commit

Permalink
Merge pull request #44 from AzureCosmosDB/develop
Browse files Browse the repository at this point in the history
JSON Source async Streams
  • Loading branch information
bowencode committed Apr 26, 2023
2 parents f8184a7 + 9068e5a commit b5239a7
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,88 @@ public async Task WriteAsync_WithFlatObjects_WritesToValidFile()
Assert.IsTrue(outputData.Any(o => o.Id == 2 && o.Name == "Two"));
Assert.IsTrue(outputData.Any(o => o.Id == 3 && o.Name == "Three"));
}

[TestMethod]
public async Task WriteAsync_WithSourceDates_PreservesDateFormats()
{
var sink = new JsonDataSinkExtension();

var now = DateTime.UtcNow;
var randomTime = DateTime.UtcNow.AddMinutes(Random.Shared.NextDouble() * 10000);
var data = new List<DictionaryDataItem>
{
new(new Dictionary<string, object?>
{
{ "Id", 1 },
{ "Created", now },
}),
new(new Dictionary<string, object?>
{
{ "Id", 2 },
{ "Created", DateTime.UnixEpoch },
}),
new(new Dictionary<string, object?>
{
{ "Id", 3 },
{ "Created", randomTime },
}),
};
string outputFile = $"{now:yy-MM-dd}_DateOutput.json";
var config = TestHelpers.CreateConfig(new Dictionary<string, string>
{
{ "FilePath", outputFile }
});

await sink.WriteAsync(data.ToAsyncEnumerable(), config, new JsonDataSourceExtension(), NullLogger.Instance);

string json = await File.ReadAllTextAsync(outputFile);
var outputData = JsonConvert.DeserializeObject<List<TestDataObject>>(json);

Assert.IsTrue(outputData.Any(o => o.Id == 1 && o.Created == now));
Assert.IsTrue(outputData.Any(o => o.Id == 2 && o.Created == DateTime.UnixEpoch));
Assert.IsTrue(outputData.Any(o => o.Id == 3 && o.Created == randomTime));
}


[TestMethod]
public async Task WriteAsync_WithDateArray_PreservesDateFormats()
{
var sink = new JsonDataSinkExtension();

var now = DateTime.UtcNow;
var randomTime = DateTime.UtcNow.AddMinutes(Random.Shared.NextDouble() * 10000);
var data = new List<DictionaryDataItem>
{
new(new Dictionary<string, object?>
{
{ "Id", 1 },
{ "Dates", new[] { now, randomTime, DateTime.UnixEpoch } },
})
};

string outputFile = $"{now:yy-MM-dd}_DateArrayOutput.json";
var config = TestHelpers.CreateConfig(new Dictionary<string, string>
{
{ "FilePath", outputFile }
});

await sink.WriteAsync(data.ToAsyncEnumerable(), config, new JsonDataSourceExtension(), NullLogger.Instance);

string json = await File.ReadAllTextAsync(outputFile);
var outputData = JsonConvert.DeserializeObject<List<TestDataObject>>(json);

Assert.AreEqual(now, outputData.Single().Dates.ElementAt(0));

Check warning on line 117 in Extensions/Json/Cosmos.DataTransfer.JsonExtension.UnitTests/JsonSinkTests.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference argument for parameter 'source' in 'DateTime Enumerable.ElementAt<DateTime>(IEnumerable<DateTime> source, int index)'.

Check warning on line 117 in Extensions/Json/Cosmos.DataTransfer.JsonExtension.UnitTests/JsonSinkTests.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference argument for parameter 'source' in 'DateTime Enumerable.ElementAt<DateTime>(IEnumerable<DateTime> source, int index)'.
Assert.AreEqual(randomTime, outputData.Single().Dates.ElementAt(1));

Check warning on line 118 in Extensions/Json/Cosmos.DataTransfer.JsonExtension.UnitTests/JsonSinkTests.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference argument for parameter 'source' in 'DateTime Enumerable.ElementAt<DateTime>(IEnumerable<DateTime> source, int index)'.

Check warning on line 118 in Extensions/Json/Cosmos.DataTransfer.JsonExtension.UnitTests/JsonSinkTests.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference argument for parameter 'source' in 'DateTime Enumerable.ElementAt<DateTime>(IEnumerable<DateTime> source, int index)'.
Assert.AreEqual(DateTime.UnixEpoch, outputData.Single().Dates.ElementAt(2));

Check warning on line 119 in Extensions/Json/Cosmos.DataTransfer.JsonExtension.UnitTests/JsonSinkTests.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference argument for parameter 'source' in 'DateTime Enumerable.ElementAt<DateTime>(IEnumerable<DateTime> source, int index)'.

Check warning on line 119 in Extensions/Json/Cosmos.DataTransfer.JsonExtension.UnitTests/JsonSinkTests.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference argument for parameter 'source' in 'DateTime Enumerable.ElementAt<DateTime>(IEnumerable<DateTime> source, int index)'.
}

}

public class TestDataObject
{
public int Id { get; set; }
public string? Name { get; set; }
public DateTime? Created { get; set; }
public List<DateTime>? Dates { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="6.0.0" />
<PackageReference Include="System.ComponentModel.Composition" Version="6.0.0" />
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System.ComponentModel.Composition;
using System.Runtime.CompilerServices;
using System.Text;
using System.Text.Json;
using Cosmos.DataTransfer.Interfaces;
using Cosmos.DataTransfer.JsonExtension.Settings;
Expand All @@ -23,13 +22,16 @@ public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogge
if (File.Exists(settings.FilePath))
{
logger.LogInformation("Reading file '{FilePath}'", settings.FilePath);
var list = await ReadFileAsync(settings.FilePath, logger, cancellationToken);
var list = ReadFileAsync(settings.FilePath, logger, cancellationToken);

if (list != null)
{
foreach (var listItem in list)
await foreach (var listItem in list.WithCancellation(cancellationToken))
{
yield return new JsonDictionaryDataItem(listItem);
if (listItem != null)
{
yield return new JsonDictionaryDataItem(listItem);
}
}
}
}
Expand All @@ -40,13 +42,16 @@ public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogge
foreach (string filePath in files.OrderBy(f => f))
{
logger.LogInformation("Reading file '{FilePath}'", filePath);
var list = await ReadFileAsync(filePath, logger, cancellationToken);
var list = ReadFileAsync(filePath, logger, cancellationToken);

if (list != null)
{
foreach (var listItem in list)
await foreach (var listItem in list.WithCancellation(cancellationToken))
{
yield return new JsonDictionaryDataItem(listItem);
if (listItem != null)
{
yield return new JsonDictionaryDataItem(listItem);
}
}
}
}
Expand All @@ -63,15 +68,18 @@ public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogge
yield break;
}

var json = await response.Content.ReadAsStringAsync(cancellationToken);
var json = await response.Content.ReadAsStreamAsync(cancellationToken);

var list = await ReadJsonItemsAsync(json, logger, cancellationToken);
var list = ReadJsonItemsAsync(json, logger, cancellationToken);

if (list != null)
{
foreach (var listItem in list)
await foreach (var listItem in list.WithCancellation(cancellationToken))
{
yield return new JsonDictionaryDataItem(listItem);
if (listItem != null)
{
yield return new JsonDictionaryDataItem(listItem);
}
}
}
}
Expand All @@ -85,45 +93,70 @@ public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogge
}
}

private static async Task<List<Dictionary<string, object?>>?> ReadFileAsync(string filePath, ILogger logger, CancellationToken cancellationToken)
private static IAsyncEnumerable<Dictionary<string, object?>?>? ReadFileAsync(string filePath, ILogger logger, CancellationToken cancellationToken)
{
var jsonText = await File.ReadAllTextAsync(filePath, cancellationToken);
return await ReadJsonItemsAsync(jsonText, logger, cancellationToken);
var jsonFile = File.OpenRead(filePath);
return ReadJsonItemsAsync(jsonFile, logger, cancellationToken);
}

private static async Task<List<Dictionary<string, object?>>?> ReadJsonItemsAsync(string jsonText, ILogger logger, CancellationToken cancellationToken)
private static IAsyncEnumerable<Dictionary<string, object?>?>? ReadJsonItemsAsync(Stream jsonStream, ILogger logger, CancellationToken cancellationToken)
{
if (jsonStream is { CanSeek: true, Length: < 10485760L })
{
// test for single item in JSON
var singleItemList = ReadSingleItemAsync(jsonStream, logger);
if (singleItemList != null)
{
return singleItemList.ToAsyncEnumerable();
}
}

try
{
using MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(jsonText));
return await JsonSerializer.DeserializeAsync<List<Dictionary<string, object?>>>(stream, cancellationToken: cancellationToken);
jsonStream.Seek(0, SeekOrigin.Begin);
return JsonSerializer.DeserializeAsyncEnumerable<Dictionary<string, object?>>(jsonStream, cancellationToken: cancellationToken);
}
catch (Exception ex)
catch (Exception)
{
// list failed
}

var list = new List<Dictionary<string, object?>>();
return null;
}

private static IEnumerable<Dictionary<string, object?>?>? ReadSingleItemAsync(Stream stream, ILogger logger)
{
Dictionary<string, object?>? item;
try
{
using MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(jsonText));
var item = await JsonSerializer.DeserializeAsync<Dictionary<string, object?>>(stream, cancellationToken: cancellationToken);
if (item != null)
{
list.Add(item);
}
item = JsonSerializer.Deserialize<Dictionary<string, object?>>(stream);
}
catch (Exception ex)
catch (Exception)
{
// single item failed
return null;
}

if (!list.Any())
if (item != null)
{
return new[] { item };
}

string textContent;
try
{
var chars = new char[50];
new StreamReader(stream).ReadBlock(chars, 0, chars.Length);
textContent = new string(chars);
}
catch (Exception ex)
{
logger.LogWarning("No records read from '{Content}'", jsonText);
logger.LogWarning(ex, "Failed to read stream");
textContent = "<error>";
}
logger.LogWarning("No records read from '{Content}'", textContent);

return list;
return null;
}
}
}
13 changes: 11 additions & 2 deletions Interfaces/Cosmos.DataTransfer.Interfaces/DataItemJsonConverter.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Text;
using System.Collections;
using System.Text;
using System.Text.Json;

namespace Cosmos.DataTransfer.Interfaces;
Expand Down Expand Up @@ -52,7 +53,7 @@ private static void WriteFieldValue(Utf8JsonWriter writer, string fieldName, obj
{
WriteDataItem(writer, child, includeNullFields, fieldName);
}
else if (fieldValue is IEnumerable<object> children)
else if (fieldValue is not string && fieldValue is IEnumerable children)
{
writer.WriteStartArray(fieldName);
foreach (object arrayItem in children)
Expand All @@ -69,6 +70,10 @@ private static void WriteFieldValue(Utf8JsonWriter writer, string fieldName, obj
{
writer.WriteBooleanValue(boolean);
}
else if (arrayItem is DateTime date)
{
writer.WriteStringValue(date.ToString("O"));
}
else
{
writer.WriteStringValue(arrayItem.ToString());
Expand All @@ -84,6 +89,10 @@ private static void WriteFieldValue(Utf8JsonWriter writer, string fieldName, obj
{
writer.WriteBoolean(fieldName, boolean);
}
else if (fieldValue is DateTime date)
{
writer.WriteString(fieldName, date.ToString("O"));
}
else
{
writer.WriteString(fieldName, fieldValue.ToString());
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ The Azure Cosmos DB Desktop Data Migration Tool is an open-source project contai

## Quick Installation

To use the tool, download the latest zip file for your platform (win-x64, mac-x64, or linux-x64) from [Releases](releases) and extract all files to your desired install location. To begin a data transfer operation, first populate the `migrationsettings.json` file with appropriate settings for your data source and sink (see [detailed instructions](#using-the-command-line) below), and then run the application from a command line: `dmt.exe` on Windows or `dmt` on other platforms.
To use the tool, download the latest zip file for your platform (win-x64, mac-x64, or linux-x64) from [Releases](https://github.com/AzureCosmosDB/data-migration-desktop-tool/releases) and extract all files to your desired install location. To begin a data transfer operation, first populate the `migrationsettings.json` file with appropriate settings for your data source and sink (see [detailed instructions](#using-the-command-line) below), and then run the application from a command line: `dmt.exe` on Windows or `dmt` on other platforms.

## Extension documentation

Expand Down

0 comments on commit b5239a7

Please sign in to comment.