Skip to content

Commit

Permalink
Merge pull request #47 from AzureCosmosDB/develop
Browse files Browse the repository at this point in the history
Adding prototype implementation of split storage+formatter extension
  • Loading branch information
bowencode committed Apr 27, 2023
2 parents b5239a7 + a4d20bc commit ed66475
Show file tree
Hide file tree
Showing 24 changed files with 507 additions and 14 deletions.
4 changes: 2 additions & 2 deletions Core/Cosmos.DataTransfer.Core/Properties/launchSettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
},
"JSON->SqlServer": {
"commandName": "Project",
"commandLineArgs": "run --source json --sink sqlServer --SourceSettings:FilePath=c:\\temp\\test-json-sql-in.json --SettingsPath=c:\\temp\\Json-SqlSettings.json"
"commandLineArgs": "run --source json --sink sqlServer --SourceSettings:FilePath=c:\\temp\\test-json-sql-in.json --settings=c:\\temp\\Json-SqlSettings.json"
},
"JSON URI->Cosmos": {
"commandName": "Project",
"commandLineArgs": "--source json --sink cosmos-nosql --SourceSettings:FilePath=https://raw.githubusercontent.com/AzureCosmosDB/data-migration-desktop-tool/feature/cosmos-configuration/Extensions/Json/.JsonExtension.UnitTests/Data/ArraysTypesNesting.json --SettingsPath=c:\\temp\\CosmosSinkSettings.json"
"commandLineArgs": "--source json-file --sink cosmos-nosql --SourceSettings:FilePath=https://raw.githubusercontent.com/AzureCosmosDB/data-migration-desktop-tool/main/Extensions/Json/Cosmos.DataTransfer.JsonExtension.UnitTests/Data/ArraysTypesNesting.json --settings=c:\\temp\\CosmosSinkSettings.json"
}
}
}
20 changes: 20 additions & 0 deletions Cosmos.DataTransfer.Common/CompositeSinkExtension.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using Cosmos.DataTransfer.Interfaces;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;

namespace Cosmos.DataTransfer.Common;

public abstract class CompositeSinkExtension<TSink, TFormatter> : IDataSinkExtension
where TSink : class, IComposableDataSink, new()
where TFormatter : class, IFormattedDataWriter, new()
{
public abstract string DisplayName { get; }

public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfiguration config, IDataSourceExtension dataSource, ILogger logger, CancellationToken cancellationToken = default)
{
var sink = new TSink();
var formatter = new TFormatter();

await sink.WriteToTargetAsync(formatter, dataItems, config, dataSource, logger, cancellationToken);
}
}
21 changes: 21 additions & 0 deletions Cosmos.DataTransfer.Common/CompositeSourceExtension.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using Cosmos.DataTransfer.Interfaces;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;

namespace Cosmos.DataTransfer.Common
{
public abstract class CompositeSourceExtension<TSource, TFormatter> : IDataSourceExtension
where TSource : class, IComposableDataSource, new()
where TFormatter : class, IFormattedDataReader, new()
{
public abstract string DisplayName { get; }

public IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogger logger, CancellationToken cancellationToken = default)
{
var source = new TSource();
var formatter = new TFormatter();

return formatter.ParseDataAsync(source, config, logger, cancellationToken);
}
}
}
17 changes: 17 additions & 0 deletions Cosmos.DataTransfer.Common/Cosmos.DataTransfer.Common.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="6.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Interfaces\Cosmos.DataTransfer.Interfaces\Cosmos.DataTransfer.Interfaces.csproj" />
</ItemGroup>

</Project>
19 changes: 19 additions & 0 deletions Cosmos.DataTransfer.Common/FileDataSink.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using Cosmos.DataTransfer.Interfaces;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;

namespace Cosmos.DataTransfer.Common;

public class FileDataSink : IComposableDataSink
{
public async Task WriteToTargetAsync(IFormattedDataWriter dataWriter, IAsyncEnumerable<IDataItem> dataItems, IConfiguration config, IDataSourceExtension dataSource, ILogger logger, CancellationToken cancellationToken = default)
{
var settings = config.Get<FileSinkSettings>();
settings.Validate();
if (settings.FilePath != null)
{
await using var writer = File.Create(settings.FilePath);
await dataWriter.FormatDataAsync(dataItems, writer, config, logger, cancellationToken);
}
}
}
56 changes: 56 additions & 0 deletions Cosmos.DataTransfer.Common/FileDataSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using Cosmos.DataTransfer.Interfaces;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;

namespace Cosmos.DataTransfer.Common;

public class FileDataSource : IComposableDataSource
{
public async IAsyncEnumerable<Stream?> ReadSourceAsync(IConfiguration config, ILogger logger, CancellationToken cancellationToken = default)

Check warning on line 9 in Cosmos.DataTransfer.Common/FileDataSource.cs

View workflow job for this annotation

GitHub Actions / build

Async-iterator 'FileDataSource.ReadSourceAsync(IConfiguration, ILogger, CancellationToken)' has one or more parameters of type 'CancellationToken' but none of them is decorated with the 'EnumeratorCancellation' attribute, so the cancellation token parameter from the generated 'IAsyncEnumerable<>.GetAsyncEnumerator' will be unconsumed

Check warning on line 9 in Cosmos.DataTransfer.Common/FileDataSource.cs

View workflow job for this annotation

GitHub Actions / build

Async-iterator 'FileDataSource.ReadSourceAsync(IConfiguration, ILogger, CancellationToken)' has one or more parameters of type 'CancellationToken' but none of them is decorated with the 'EnumeratorCancellation' attribute, so the cancellation token parameter from the generated 'IAsyncEnumerable<>.GetAsyncEnumerator' will be unconsumed
{
var settings = config.Get<FileSourceSettings>();
settings.Validate();
if (settings.FilePath != null)
{

if (File.Exists(settings.FilePath))
{
logger.LogInformation("Reading file '{FilePath}'", settings.FilePath);
yield return File.OpenRead(settings.FilePath);
}
else if (Directory.Exists(settings.FilePath))
{
string[] files = Directory.GetFiles(settings.FilePath, "*.json", SearchOption.AllDirectories);
logger.LogInformation("Reading {FileCount} files from '{Folder}'", files.Length, settings.FilePath);
foreach (string filePath in files.OrderBy(f => f))
{
logger.LogInformation("Reading file '{FilePath}'", filePath);
yield return File.OpenRead(filePath);
}
}
else if (Uri.IsWellFormedUriString(settings.FilePath, UriKind.RelativeOrAbsolute))
{
logger.LogInformation("Reading from URI '{FilePath}'", settings.FilePath);

HttpClient client = new HttpClient();
var response = await client.GetAsync(settings.FilePath, cancellationToken);
if (!response.IsSuccessStatusCode)
{
logger.LogError("Failed to read {FilePath}. Response was: {ResponseCode} {ResponseMessage}", settings.FilePath, response.StatusCode, response.ReasonPhrase);
yield break;
}

var json = await response.Content.ReadAsStreamAsync(cancellationToken);

yield return json;
}
else
{
logger.LogWarning("No content was found at configured path '{FilePath}'", settings.FilePath);
yield break;
}

logger.LogInformation("Completed reading '{FilePath}'", settings.FilePath);
}
}
}
10 changes: 10 additions & 0 deletions Cosmos.DataTransfer.Common/FileSinkSettings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.ComponentModel.DataAnnotations;
using Cosmos.DataTransfer.Interfaces;

namespace Cosmos.DataTransfer.Common;

public class FileSinkSettings : IDataExtensionSettings
{
[Required]
public string? FilePath { get; set; }
}
10 changes: 10 additions & 0 deletions Cosmos.DataTransfer.Common/FileSourceSettings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.ComponentModel.DataAnnotations;
using Cosmos.DataTransfer.Interfaces;

namespace Cosmos.DataTransfer.Common;

public class FileSourceSettings : IDataExtensionSettings
{
[Required]
public string? FilePath { get; set; }
}
6 changes: 6 additions & 0 deletions CosmosDbDataMigrationTool.sln
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.SqlServ
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.SqlServerExtension.UnitTests", "Extensions\SqlServer\Cosmos.DataTransfer.SqlServerExtension.UnitTests\Cosmos.DataTransfer.SqlServerExtension.UnitTests.csproj", "{3E4C4ABF-D8C2-4997-A719-E756483C8D63}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cosmos.DataTransfer.Common", "Cosmos.DataTransfer.Common\Cosmos.DataTransfer.Common.csproj", "{0FAD9D89-2E41-4D65-8440-5C01885D9292}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -108,6 +110,10 @@ Global
{3E4C4ABF-D8C2-4997-A719-E756483C8D63}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3E4C4ABF-D8C2-4997-A719-E756483C8D63}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3E4C4ABF-D8C2-4997-A719-E756483C8D63}.Release|Any CPU.Build.0 = Release|Any CPU
{0FAD9D89-2E41-4D65-8440-5C01885D9292}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0FAD9D89-2E41-4D65-8440-5C01885D9292}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0FAD9D89-2E41-4D65-8440-5C01885D9292}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0FAD9D89-2E41-4D65-8440-5C01885D9292}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
using Microsoft.Extensions.Logging.Abstractions;
using Newtonsoft.Json.Linq;

namespace Cosmos.DataTransfer.JsonExtension.UnitTests
{
[TestClass]
public class JsonFileRoundTripTests
{
[TestMethod]
public async Task WriteAsync_fromReadAsync_ProducesIdenticalFile()
{
var input = new JsonFileSource();
var output = new JsonFileSink();

const string fileIn = "Data/ArraysTypesNesting.json";
const string fileOut = $"{nameof(WriteAsync_fromReadAsync_ProducesIdenticalFile)}_out.json";

var sourceConfig = TestHelpers.CreateConfig(new Dictionary<string, string>
{
{ "FilePath", fileIn }
});
var sinkConfig = TestHelpers.CreateConfig(new Dictionary<string, string>
{
{ "FilePath", fileOut },
{ "Indented", "true" },
});

await output.WriteAsync(input.ReadAsync(sourceConfig, NullLogger.Instance), sinkConfig, input, NullLogger.Instance);

bool areEqual = JToken.DeepEquals(JToken.Parse(await File.ReadAllTextAsync(fileIn)), JToken.Parse(await File.ReadAllTextAsync(fileOut)));
Assert.IsTrue(areEqual);
}

[TestMethod]
public async Task WriteAsync_fromFolderReadAsync_ProducesExpectedCombinedFile()
{
var input = new JsonFileSource();
var output = new JsonFileSink();

const string fileIn = "Data/SingleObjects";
const string fileCompare = "Data/SimpleIdName.json";
const string fileOut = $"{nameof(WriteAsync_fromFolderReadAsync_ProducesExpectedCombinedFile)}_out.json";

var sourceConfig = TestHelpers.CreateConfig(new Dictionary<string, string>
{
{ "FilePath", fileIn }
});
var sinkConfig = TestHelpers.CreateConfig(new Dictionary<string, string>
{
{ "FilePath", fileOut },
{ "Indented", "true" },
});

await output.WriteAsync(input.ReadAsync(sourceConfig, NullLogger.Instance), sinkConfig, input, NullLogger.Instance);

bool areEqual = JToken.DeepEquals(JToken.Parse(await File.ReadAllTextAsync(fileCompare)), JToken.Parse(await File.ReadAllTextAsync(fileOut)));
Assert.IsTrue(areEqual);
}

[TestMethod]
public async Task WriteAsync_fromReadUriAsync_ProducesIdenticalFile()
{
var input = new JsonFileSource();
var output = new JsonFileSink();

const string urlIn = "https://raw.githubusercontent.com/AzureCosmosDB/data-migration-desktop-tool/main/Extensions/Json/Cosmos.DataTransfer.JsonExtension.UnitTests/Data/ArraysTypesNesting.json";
const string compareFile = "Data/ArraysTypesNesting.json";
const string fileOut = $"{nameof(WriteAsync_fromReadAsync_ProducesIdenticalFile)}_out.json";

var sourceConfig = TestHelpers.CreateConfig(new Dictionary<string, string>
{
{ "FilePath", urlIn }
});
var sinkConfig = TestHelpers.CreateConfig(new Dictionary<string, string>
{
{ "FilePath", fileOut },
{ "Indented", "true" },
});

await output.WriteAsync(input.ReadAsync(sourceConfig, NullLogger.Instance), sinkConfig, input, NullLogger.Instance);

bool areEqual = JToken.DeepEquals(JToken.Parse(await File.ReadAllTextAsync(compareFile)), JToken.Parse(await File.ReadAllTextAsync(fileOut)));
Assert.IsTrue(areEqual);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using Cosmos.DataTransfer.Interfaces;
using Microsoft.Extensions.Logging.Abstractions;
using Newtonsoft.Json;

namespace Cosmos.DataTransfer.JsonExtension.UnitTests
{
[TestClass]
public class JsonFileSinkTests
{
[TestMethod]
public async Task WriteAsync_WithFlatObjects_WritesToValidFile()
{
var sink = new JsonFileSink();

var data = new List<DictionaryDataItem>
{
new(new Dictionary<string, object?>
{
{ "Id", 1 },
{ "Name", "One" },
}),
new(new Dictionary<string, object?>
{
{ "Id", 2 },
{ "Name", "Two" },
}),
new(new Dictionary<string, object?>
{
{ "Id", 3 },
{ "Name", "Three" },
}),
};
string outputFile = $"{DateTime.Now:yy-MM-dd}_FS_Output.json";
var config = TestHelpers.CreateConfig(new Dictionary<string, string>
{
{ "FilePath", outputFile }
});

await sink.WriteAsync(data.ToAsyncEnumerable(), config, new JsonDataSourceExtension(), NullLogger.Instance);

var outputData = JsonConvert.DeserializeObject<List<TestDataObject>>(await File.ReadAllTextAsync(outputFile));

Assert.IsTrue(outputData.Any(o => o.Id == 1 && o.Name == "One"));
Assert.IsTrue(outputData.Any(o => o.Id == 2 && o.Name == "Two"));
Assert.IsTrue(outputData.Any(o => o.Id == 3 && o.Name == "Three"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,5 @@ public async Task WriteAsync_fromReadUriAsync_ProducesIdenticalFile()
bool areEqual = JToken.DeepEquals(JToken.Parse(await File.ReadAllTextAsync(compareFile)), JToken.Parse(await File.ReadAllTextAsync(fileOut)));
Assert.IsTrue(areEqual);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,10 @@ public async Task WriteAsync_WithDateArray_PreservesDateFormats()
string json = await File.ReadAllTextAsync(outputFile);
var outputData = JsonConvert.DeserializeObject<List<TestDataObject>>(json);

Assert.AreEqual(now, outputData.Single().Dates.ElementAt(0));
Assert.AreEqual(randomTime, outputData.Single().Dates.ElementAt(1));
Assert.AreEqual(DateTime.UnixEpoch, outputData.Single().Dates.ElementAt(2));
Assert.AreEqual(now, outputData?.Single().Dates?.ElementAt(0));
Assert.AreEqual(randomTime, outputData?.Single().Dates?.ElementAt(1));
Assert.AreEqual(DateTime.UnixEpoch, outputData?.Single().Dates?.ElementAt(2));
}

}

public class TestDataObject
{
public int Id { get; set; }
public string? Name { get; set; }
public DateTime? Created { get; set; }
public List<DateTime>? Dates { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace Cosmos.DataTransfer.JsonExtension.UnitTests
{
public class TestDataObject
{
public int Id { get; set; }
public string? Name { get; set; }
public DateTime? Created { get; set; }
public List<DateTime>? Dates { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\Cosmos.DataTransfer.Common\Cosmos.DataTransfer.Common.csproj" />
<ProjectReference Include="..\..\..\Interfaces\Cosmos.DataTransfer.Interfaces\Cosmos.DataTransfer.Interfaces.csproj" />
</ItemGroup>

Expand Down
Loading

0 comments on commit ed66475

Please sign in to comment.