title | description | author | ms.service | ms.subservice | ms.devlang | ms.topic | ms.date | ms.author | ms.reviewer | ms.custom |
---|---|---|---|---|---|---|---|---|---|---|
Use bulk executor Java library in Azure Cosmos DB to perform bulk import and update operations |
Bulk import and update Azure Cosmos DB documents using bulk executor Java library |
tknandu |
cosmos-db |
cosmosdb-sql |
java |
how-to |
08/26/2020 |
ramkris |
sngun |
devx-track-java |
This tutorial provides instructions on using the Azure Cosmos DB's bulk executor Java library to import, and update Azure Cosmos DB documents. To learn about bulk executor library and how it helps you leverage massive throughput and storage, see bulk executor Library overview article. In this tutorial, you build a Java application that generates random documents and they are bulk imported into an Azure Cosmos container. After importing, you will bulk update some properties of a document.
Currently, the bulk executor library is supported only by Azure Cosmos DB SQL API and Gremlin API accounts. This article describes how to use bulk executor Java library with SQL API accounts. To learn about using bulk executor .NET library with Gremlin API, see perform bulk operations in Azure Cosmos DB Gremlin API. The bulk executor library described is available is only available for the Azure Cosmos DB Java sync SDK v2 and it is the current recommended solution for Java bulk support. It is currently not available for the 3.x, 4.x or other higher SDK versions.
-
If you don't have an Azure subscription, create a free account before you begin.
-
You can try Azure Cosmos DB for free without an Azure subscription, free of charge and commitments. Or, you can use the Azure Cosmos DB Emulator with the
https://localhost:8081
endpoint. The Primary Key is provided in Authenticating requests. -
Java Development Kit (JDK) 1.7+
-
On Ubuntu, run
apt-get install default-jdk
to install the JDK. -
Be sure to set the JAVA_HOME environment variable to point to the folder where the JDK is installed.
-
-
Download and install a Maven binary archive
- On Ubuntu, you can run
apt-get install maven
to install Maven.
- On Ubuntu, you can run
-
Create an Azure Cosmos DB SQL API account by using the steps described in the create database account section of the Java quickstart article.
Now let's switch to working with code by downloading a sample Java application from GitHub. This application performs bulk operations on Azure Cosmos DB data. To clone the application, open a command prompt, navigate to the directory where you want to copy the application and run the following command:
git clone https://github.com/Azure/azure-cosmosdb-bulkexecutor-java-getting-started.git
The cloned repository contains two samples "bulkimport" and "bulkupdate" relative to the "\azure-cosmosdb-bulkexecutor-java-getting-started\samples\bulkexecutor-sample\src\main\java\com\microsoft\azure\cosmosdb\bulkexecutor" folder. The "bulkimport" application generates random documents and imports them to Azure Cosmos DB. The "bulkupdate" application updates some documents in Azure Cosmos DB. In the next sections, we will review the code in each of these sample apps.
-
The Azure Cosmos DB's connection strings are read as arguments and assigned to variables defined in CmdLineConfiguration.java file.
-
Next the DocumentClient object is initialized by using the following statements:
ConnectionPolicy connectionPolicy = new ConnectionPolicy(); connectionPolicy.setMaxPoolSize(1000); DocumentClient client = new DocumentClient( HOST, MASTER_KEY, connectionPolicy, ConsistencyLevel.Session)
-
The DocumentBulkExecutor object is initialized with a high retry values for wait time and throttled requests. And then they are set to 0 to pass congestion control to DocumentBulkExecutor for its lifetime.
// Set client's retry options high for initialization client.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(30); client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(9); // Builder pattern Builder bulkExecutorBuilder = DocumentBulkExecutor.builder().from( client, DATABASE_NAME, COLLECTION_NAME, collection.getPartitionKey(), offerThroughput) // throughput you want to allocate for bulk import out of the container's total throughput // Instantiate DocumentBulkExecutor DocumentBulkExecutor bulkExecutor = bulkExecutorBuilder.build() // Set retries to 0 to pass complete control to bulk executor client.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(0); client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(0);
-
Call the importAll API that generates random documents to bulk import into an Azure Cosmos container. You can configure the command line configurations within the CmdLineConfiguration.java file.
BulkImportResponse bulkImportResponse = bulkExecutor.importAll(documents, false, true, null);
The bulk import API accepts a collection of JSON-serialized documents and it has the following syntax, for more details, see the API documentation:
public BulkImportResponse importAll( Collection<String> documents, boolean isUpsert, boolean disableAutomaticIdGeneration, Integer maxConcurrencyPerPartitionRange) throws DocumentClientException;
The importAll method accepts the following parameters:
Parameter Description isUpsert A flag to enable upsert of the documents. If a document with given ID already exists, it's updated. disableAutomaticIdGeneration A flag to disable automatic generation of ID. By default, it is set to true. maxConcurrencyPerPartitionRange The maximum degree of concurrency per partition key range. The default value is 20. Bulk import response object definition The result of the bulk import API call contains the following get methods:
Parameter Description int getNumberOfDocumentsImported() The total number of documents that were successfully imported out of the documents supplied to the bulk import API call. double getTotalRequestUnitsConsumed() The total request units (RU) consumed by the bulk import API call. Duration getTotalTimeTaken() The total time taken by the bulk import API call to complete execution. List<Exception> getErrors() Gets the list of errors if some documents out of the batch supplied to the bulk import API call failed to get inserted. List<Object> getBadInputDocuments() The list of bad-format documents that were not successfully imported in the bulk import API call. User should fix the documents returned and retry import. Bad-formatted documents include documents whose ID value is not a string (null or any other datatype is considered invalid). -
After you have the bulk import application ready, build the command-line tool from source by using the 'mvn clean package' command. This command generates a jar file in the target folder:
mvn clean package
-
After the target dependencies are generated, you can invoke the bulk importer application by using the following command:
java -Xmx12G -jar bulkexecutor-sample-1.0-SNAPSHOT-jar-with-dependencies.jar -serviceEndpoint *<Fill in your Azure Cosmos DB's endpoint>* -masterKey *<Fill in your Azure Cosmos DB's primary key>* -databaseId bulkImportDb -collectionId bulkImportColl -operation import -shouldCreateCollection -collectionThroughput 1000000 -partitionKey /profileid -maxConnectionPoolSize 6000 -numberOfDocumentsForEachCheckpoint 1000000 -numberOfCheckpoints 10
The bulk importer creates a new database and a collection with the database name, collection name, and throughput values specified in the App.config file.
You can update existing documents by using the BulkUpdateAsync API. In this example, you will set the Name field to a new value and remove the Description field from the existing documents. For the full set of supported field update operations, see API documentation.
-
Defines the update items along with corresponding field update operations. In this example, you will use SetUpdateOperation to update the Name field and UnsetUpdateOperation to remove the Description field from all the documents. You can also perform other operations like increment a document field by a specific value, push specific values into an array field, or remove a specific value from an array field. To learn about different methods provided by the bulk update API, see the API documentation.
SetUpdateOperation<String> nameUpdate = new SetUpdateOperation<>("Name","UpdatedDocValue"); UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description"); ArrayList<UpdateOperationBase> updateOperations = new ArrayList<>(); updateOperations.add(nameUpdate); updateOperations.add(descriptionUpdate); List<UpdateItem> updateItems = new ArrayList<>(cfg.getNumberOfDocumentsForEachCheckpoint()); IntStream.range(0, cfg.getNumberOfDocumentsForEachCheckpoint()).mapToObj(j -> { return new UpdateItem(Long.toString(prefix + j), Long.toString(prefix + j), updateOperations); }).collect(Collectors.toCollection(() -> updateItems));
-
Call the updateAll API that generates random documents to be then bulk imported into an Azure Cosmos container. You can configure the command-line configurations to be passed in CmdLineConfiguration.java file.
BulkUpdateResponse bulkUpdateResponse = bulkExecutor.updateAll(updateItems, null)
The bulk update API accepts a collection of items to be updated. Each update item specifies the list of field update operations to be performed on a document identified by an ID and a partition key value. for more details, see the API documentation:
public BulkUpdateResponse updateAll( Collection<UpdateItem> updateItems, Integer maxConcurrencyPerPartitionRange) throws DocumentClientException;
The updateAll method accepts the following parameters:
Parameter Description maxConcurrencyPerPartitionRange The maximum degree of concurrency per partition key range. The default value is 20. Bulk import response object definition The result of the bulk import API call contains the following get methods:
Parameter Description int getNumberOfDocumentsUpdated() The total number of documents that were successfully updated out of the documents supplied to the bulk update API call. double getTotalRequestUnitsConsumed() The total request units (RU) consumed by the bulk update API call. Duration getTotalTimeTaken() The total time taken by the bulk update API call to complete execution. List<Exception> getErrors() Gets the list of operational or networking issues related to the update operation. List<BulkUpdateFailure> getFailedUpdates() Gets the list of updates which could not be completed along with the specific exceptions leading to the failures. -
After you have the bulk update application ready, build the command-line tool from source by using the 'mvn clean package' command. This command generates a jar file in the target folder:
mvn clean package
-
After the target dependencies are generated, you can invoke the bulk update application by using the following command:
java -Xmx12G -jar bulkexecutor-sample-1.0-SNAPSHOT-jar-with-dependencies.jar -serviceEndpoint **<Fill in your Azure Cosmos DB's endpoint>* -masterKey **<Fill in your Azure Cosmos DB's primary key>* -databaseId bulkUpdateDb -collectionId bulkUpdateColl -operation update -collectionThroughput 1000000 -partitionKey /profileid -maxConnectionPoolSize 6000 -numberOfDocumentsForEachCheckpoint 1000000 -numberOfCheckpoints 10
Consider the following points for better performance when using bulk executor library:
-
For best performance, run your application from an Azure VM in the same region as your Cosmos DB account write region.
-
For achieving higher throughput:
- Set the JVM's heap size to a large enough number to avoid any memory issue in handling large number of documents. Suggested heap size: max(3GB, 3 * sizeof(all documents passed to bulk import API in one batch)).
- There is a preprocessing time, due to which you will get higher throughput when performing bulk operations with a large number of documents. So, if you want to import 10,000,000 documents, running bulk import 10 times on 10 bulk of documents each of size 1,000,000 is preferable than running bulk import 100 times on 100 bulk of documents each of size 100,000 documents.
-
It is recommended to instantiate a single DocumentBulkExecutor object for the entire application within a single virtual machine that corresponds to a specific Azure Cosmos container.
-
Since a single bulk operation API execution consumes a large chunk of the client machine's CPU and network IO. This happens by spawning multiple tasks internally, avoid spawning multiple concurrent tasks within your application process each executing bulk operation API calls. If a single bulk operation API call running on a single virtual machine is unable to consume your entire container's throughput (if your container's throughput > 1 million RU/s), it's preferable to create separate virtual machines to concurrently execute bulk operation API calls.
- To learn about maven package details and release notes of bulk executor Java library, seebulk executor SDK details.