Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Update to sample with latest bulk import API signature
Browse files Browse the repository at this point in the history
  • Loading branch information
tknandu committed Mar 28, 2018
1 parent 81a8b04 commit 970a2d0
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Expand Up @@ -159,7 +159,7 @@ publish/
PublishScripts/

# NuGet Packages
*.nupkg
# *.nupkg
# The packages folder can be ignored because of Package Restore
**/packages/*
# except build/, which is used as an MSBuild target.
Expand Down
8 changes: 4 additions & 4 deletions BulkImportSample/BulkImportSample.sln
Expand Up @@ -11,10 +11,10 @@ Global
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{38689E63-036A-4600-B570-CCF82EC58545}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{38689E63-036A-4600-B570-CCF82EC58545}.Debug|Any CPU.Build.0 = Debug|Any CPU
{38689E63-036A-4600-B570-CCF82EC58545}.Release|Any CPU.ActiveCfg = Release|Any CPU
{38689E63-036A-4600-B570-CCF82EC58545}.Release|Any CPU.Build.0 = Release|Any CPU
{38689E63-036A-4600-B570-CCF82EC58545}.Debug|Any CPU.ActiveCfg = Release|x64
{38689E63-036A-4600-B570-CCF82EC58545}.Debug|Any CPU.Build.0 = Release|x64
{38689E63-036A-4600-B570-CCF82EC58545}.Release|Any CPU.ActiveCfg = Release|x64
{38689E63-036A-4600-B570-CCF82EC58545}.Release|Any CPU.Build.0 = Release|x64
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
10 changes: 5 additions & 5 deletions BulkImportSample/BulkImportSample/App.config
Expand Up @@ -8,13 +8,12 @@
<add key="AuthorizationKey" value="C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==" />
<add key="DatabaseName" value="TestDb" />
<add key="CollectionName" value="TestColl" />
<add key="CollectionThroughput" value="50000" />
<add key="CollectionThroughput" value="1000000" />
<add key="ShouldCleanupOnStart" value="true" />
<add key="ShouldCleanupOnFinish" value="true" />
<add key="NumberOfDocumentsToImport" value="200000" />
<add key="NumberOfBatches" value="2" />
<add key="ShouldCleanupOnFinish" value="false" />
<add key="NumberOfDocumentsToImport" value="10000000" />
<add key="NumberOfBatches" value="10" />
<add key="CollectionPartitionKey" value="/profileid" />
<add key="SampleDocumentTemplateFile" value="SampleDocumentTemplate.json" />
</appSettings>
<system.diagnostics>
<trace autoflush="false" indentsize="4">
Expand All @@ -26,6 +25,7 @@
</system.diagnostics>
<runtime>
<gcServer enabled="true" />
<gcAllowVeryLargeObjects enabled="true" />
<assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
<dependentAssembly>
<assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
Expand Down
27 changes: 24 additions & 3 deletions BulkImportSample/BulkImportSample/BulkImportSample.csproj
Expand Up @@ -12,6 +12,8 @@
<TargetFrameworkVersion>v4.5.2</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
<NuGetPackageImportStamp>
</NuGetPackageImportStamp>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
Expand All @@ -32,13 +34,33 @@
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Debug|x64'">
<DebugSymbols>true</DebugSymbols>
<OutputPath>bin\x64\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<DebugType>full</DebugType>
<PlatformTarget>x64</PlatformTarget>
<ErrorReport>prompt</ErrorReport>
<CodeAnalysisRuleSet>MinimumRecommendedRules.ruleset</CodeAnalysisRuleSet>
<Prefer32Bit>true</Prefer32Bit>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Release|x64'">
<OutputPath>bin\x64\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<Optimize>true</Optimize>
<DebugType>pdbonly</DebugType>
<PlatformTarget>x64</PlatformTarget>
<ErrorReport>prompt</ErrorReport>
<CodeAnalysisRuleSet>MinimumRecommendedRules.ruleset</CodeAnalysisRuleSet>
<Prefer32Bit>true</Prefer32Bit>
</PropertyGroup>
<ItemGroup>
<Reference Include="Microsoft.Azure.CosmosDB.BulkImport, Version=1.21.0.0, Culture=neutral, PublicKeyToken=69c3241e6f0468ca, processorArchitecture=MSIL">
<HintPath>..\packages\Microsoft.Azure.CosmosDB.BulkExecutor.0.0.3-preview\lib\net45\Microsoft.Azure.CosmosDB.BulkImport.dll</HintPath>
<HintPath>..\packages\Microsoft.Azure.CosmosDB.BulkExecutor.0.0.9-preview\lib\net45\Microsoft.Azure.CosmosDB.BulkImport.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Microsoft.Azure.Documents.Client, Version=1.21.0.0, Culture=neutral, PublicKeyToken=69c3241e6f0468ca, processorArchitecture=MSIL">
<HintPath>..\packages\Microsoft.Azure.CosmosDB.BulkExecutor.0.0.3-preview\lib\net45\Microsoft.Azure.Documents.Client.dll</HintPath>
<HintPath>..\packages\Microsoft.Azure.CosmosDB.BulkExecutor.0.0.9-preview\lib\net45\Microsoft.Azure.Documents.Client.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Newtonsoft.Json, Version=10.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
Expand All @@ -63,7 +85,6 @@
<ItemGroup>
<None Include="App.config" />
<None Include="packages.config" />
<None Include="SampleDocumentTemplate.json" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Expand Down
Binary file not shown.
26 changes: 11 additions & 15 deletions BulkImportSample/BulkImportSample/Program.cs
Expand Up @@ -8,12 +8,9 @@ namespace BulkImportSample
using System.Configuration;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using Newtonsoft.Json;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.CosmosDB.BulkExecutor;
Expand Down Expand Up @@ -117,9 +114,6 @@ private async Task RunBulkImportAsync()
// Creating documents with simple partition key here.
string partitionKeyProperty = dataCollection.PartitionKey.Paths[0].Replace("/", "");

string sampleDocument = File.ReadAllText("../../" + ConfigurationManager.AppSettings["SampleDocumentTemplateFile"]);
Dictionary<string, object> sampleDictionary = JsonConvert.DeserializeObject<Dictionary<string, object>>(sampleDocument);

long numberOfDocumentsToGenerate = long.Parse(ConfigurationManager.AppSettings["NumberOfDocumentsToImport"]);
int numberOfBatches = int.Parse(ConfigurationManager.AppSettings["NumberOfBatches"]);
long numberOfDocumentsPerBatch = (long)Math.Floor(((double)numberOfDocumentsToGenerate) / numberOfBatches);
Expand All @@ -137,22 +131,21 @@ private async Task RunBulkImportAsync()

for (int i = 0; i < numberOfBatches; i++)
{
// Generate documents to import.
// Generate JSON-serialized documents to import.

List<object> documentsToImportInBatch = new List<object>();
List<string> documentsToImportInBatch = new List<string>();
long prefix = i * numberOfDocumentsPerBatch;

Trace.TraceInformation(String.Format("Generating {0} documents to import for batch {1}", numberOfDocumentsPerBatch, i));
for (int j = 0; j < numberOfDocumentsPerBatch; j++)
{
string partitionKeyValue = (prefix + j).ToString();
sampleDictionary["id"] = partitionKeyValue + Guid.NewGuid().ToString();
sampleDictionary[partitionKeyProperty] = partitionKeyValue;
string id = partitionKeyValue + Guid.NewGuid().ToString();

documentsToImportInBatch.Add(sampleDictionary.ToDictionary(entry => entry.Key, entry => entry.Value));
documentsToImportInBatch.Add(Utils.GenerateRandomDocumentString(id, partitionKeyProperty, partitionKeyValue));
}

// Invoke bulk import (deserialized docs) API.
// Invoke bulk import API.

var tasks = new List<Task>();

Expand All @@ -165,9 +158,10 @@ private async Task RunBulkImportAsync()
{
bulkImportResponse = await bulkExecutor.BulkImportAsync(
documents: documentsToImportInBatch,
enableUpsert: false,
enableUpsert: true,
disableAutomaticIdGeneration: true,
maxConcurrencyPerPartitionKeyRange: null,
maxInMemorySortingBatchSize: null,
cancellationToken: token);
}
catch (DocumentClientException de)
Expand All @@ -184,7 +178,7 @@ private async Task RunBulkImportAsync()
Trace.WriteLine(String.Format("\nSummary for batch {0}:", i));
Trace.WriteLine("--------------------------------------------------------------------- ");
Trace.WriteLine(String.Format("Inserted {0} docs @ {1} writes/s, {2} RU/s in {3} sec)",
Trace.WriteLine(String.Format("Inserted {0} docs @ {1} writes/s, {2} RU/s in {3} sec",
bulkImportResponse.NumberOfDocumentsImported,
Math.Round(bulkImportResponse.NumberOfDocumentsImported / bulkImportResponse.TotalTimeTaken.TotalSeconds),
Math.Round(bulkImportResponse.TotalRequestUnitsConsumed / bulkImportResponse.TotalTimeTaken.TotalSeconds),
Expand All @@ -199,6 +193,7 @@ private async Task RunBulkImportAsync()
},
token));

/*
tasks.Add(Task.Run(() =>
{
char ch = Console.ReadKey(true).KeyChar;
Expand All @@ -208,13 +203,14 @@ private async Task RunBulkImportAsync()
Trace.WriteLine("\nTask cancellation requested.");
}
}));
*/

await Task.WhenAll(tasks);
}

Trace.WriteLine("Overall summary:");
Trace.WriteLine("--------------------------------------------------------------------- ");
Trace.WriteLine(String.Format("Inserted {0} docs @ {1} writes/s, {2} RU/s in {3} sec)",
Trace.WriteLine(String.Format("Inserted {0} docs @ {1} writes/s, {2} RU/s in {3} sec",
totalNumberOfDocumentsInserted,
Math.Round(totalNumberOfDocumentsInserted / totalTimeTakenSec),
Math.Round(totalRequestUnitsConsumed / totalTimeTakenSec),
Expand Down
11 changes: 0 additions & 11 deletions BulkImportSample/BulkImportSample/SampleDocumentTemplate.json

This file was deleted.

16 changes: 16 additions & 0 deletions BulkImportSample/BulkImportSample/Utils.cs
Expand Up @@ -66,5 +66,21 @@ static internal Database GetDatabaseIfExists(DocumentClient client, string datab

return collection;
}

static internal String GenerateRandomDocumentString(String id, String partitionKeyProperty, object parititonKeyValue)
{
return "{\n" +
" \"id\": \"" + id + "\",\n" +
" \"" + partitionKeyProperty + "\": \"" + parititonKeyValue + "\",\n" +
" \"Name\": \"TestDoc\",\n" +
" \"description\": \"1.99\",\n" +
" \"segments\": [375,668,6],\n" +
" \"f1\": \"3hrkjh3h4h4h3jk4h\",\n" +
" \"f2\": \"dhfkjdhfhj4434434\",\n" +
" \"f3\": \"nklfjeoirje434344\",\n" +
" \"f4\": \"pjfgdgfhdgfgdhbd6\",\n" +
" \"f5\": \"3434343ghghghgghj\"" +
"}";
}
}
}
2 changes: 1 addition & 1 deletion BulkImportSample/BulkImportSample/packages.config
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Microsoft.Azure.CosmosDB.BulkExecutor" version="0.0.3-preview" targetFramework="net452" />
<package id="Microsoft.Azure.CosmosDB.BulkExecutor" version="0.0.9-preview" targetFramework="net452" />
<package id="Newtonsoft.Json" version="10.0.2" targetFramework="net452" />
</packages>
7 changes: 5 additions & 2 deletions README.md
Expand Up @@ -21,6 +21,7 @@ We provide two overloads of the bulk import API - one which accepts a list of JS
bool enableUpsert = false,
bool disableAutomaticIdGeneration = true,
int? maxConcurrencyPerPartitionKeyRange = null,
int? maxInMemorySortingBatchSize = null,
CancellationToken cancellationToken = default(CancellationToken));
```

Expand All @@ -31,13 +32,15 @@ We provide two overloads of the bulk import API - one which accepts a list of JS
bool enableUpsert = false,
bool disableAutomaticIdGeneration = true,
int? maxConcurrencyPerPartitionKeyRange = null,
int? maxInMemorySortingBatchSize = null,
CancellationToken cancellationToken = default(CancellationToken));
```

##### Configurable parameters:
* *enableUpsert* : A flag to enable upsert of the documents, default value is false.
* *disableAutomaticIdGeneration* : A flag to disable automatic generation of ids if absent in the docuement.
* *maxConcurrencyPerPartitionKeyRange* : The maximum degree of concurrency per partition key range, setting to null will cause library to use default value of 20.
* *maxInMemorySortingBatchSize* : The maximum number of documents pulled from the document enumerator passed to the API call in each stage for in-memory pre-processing sorting phase prior to bulk importing, setting to null will cause library to use default value of min(documents.count, 1000000).
* *cancellationToken* : The cancellation token to gracefully exit bulk import.

##### Bulk import response object definition
Expand All @@ -46,13 +49,13 @@ The result of the bulk import API call contains the following attributes:
* *NumberOfDocumentsImported* (long) : The total number of documents which were successfully imported out of the documents supplied to the bulk import API call.
* *TotalRequestUnitsConsumed* (double) : The total request units (RU) consumed by the bulk import API call.
* *TotalTimeTaken* (TimeSpan) : The total time taken by the bulk import API call to complete.
* *BadInputDocuments* (List\<object\>) : The list of bad-format documents which were not successfully imported in the bulk import API call. User needs to fix the documents - potential reasons could be null document id, invalid JSON, etc.
* *BadInputDocuments* (List\<object\>) : The list of bad-format documents which were not successfully imported in the bulk import API call. User needs to fix the documents - potential reasons could be invalid document id, invalid JSON format, etc.

#### Getting started

You can find a sample application program consuming the bulk import API [here](BulkImportSample\BulkImportSample\Program.cs) - which generates random documents to be then bulk imported into a Cosmos DB collecition. You can configure the application settings in *appSettings* [here](BulkImportSample\BulkImportSample\App.config).

### Couple of other pointers
### Additional pointers

* For best performance, run your application from an Azure VM in the same region as your Cosmos DB account write region.
* It is advised to instantiate a single *BulkExecutor* object for the entirety of the application corresponding to a specific Cosmos DB collection.
Expand Down

0 comments on commit 970a2d0

Please sign in to comment.