Skip to content
9 changes: 9 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@
<artifactId>httpcore</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
Expand Down Expand Up @@ -275,6 +279,11 @@
<artifactId>httpcore</artifactId>
<version>4.4.11</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/com/arangodb/ArangoCollection.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,23 @@ <T> MultiDocumentEntity<DocumentCreateEntity<T>> insertDocuments(
*/
DocumentImportEntity importDocuments(Collection<?> values, DocumentImportOptions options) throws ArangoDBException;

/**
* Bulk imports the given values into the collection.
*
* @param values
* a list of Objects that will be stored as documents
* @param options
* Additional options, can be null
* @param batchSize
* Size for individual data batches of the original collection of values
* @param numThreads
* Number of parallel import threads
* @return list of information about the imported batches
* @throws ArangoDBException
*/
Collection<DocumentImportEntity> importDocuments(Collection<?> values, DocumentImportOptions options,
int batchSize, int numThreads) throws ArangoDBException;

/**
* Bulk imports the given values into the collection.
*
Expand Down
33 changes: 33 additions & 0 deletions src/main/java/com/arangodb/internal/ArangoCollectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@

package com.arangodb.internal;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.commons.collections4.ListUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -105,6 +112,32 @@ public DocumentImportEntity importDocuments(final Collection<?> values, final Do
return executor.execute(importDocumentsRequest(values, options), DocumentImportEntity.class);
}

@Override
public Collection<DocumentImportEntity> importDocuments(Collection<?> values, DocumentImportOptions options,
int batchSize, int numThreads) throws ArangoDBException {
List<? extends List<?>> batches = ListUtils.partition(new ArrayList<>(values), batchSize);
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
List<CompletableFuture<DocumentImportEntity>> completableFutureList = new ArrayList<>();
for (List<?> batch : batches) {
CompletableFuture<DocumentImportEntity> completableFuture = CompletableFuture.supplyAsync(() -> {
DocumentImportEntity documentImportEntity = importDocuments(batch, options);
return documentImportEntity;
}, executorService);
completableFutureList.add(completableFuture);
}
List<DocumentImportEntity> documentImportEntityList = new ArrayList<>();
for (CompletableFuture<DocumentImportEntity> completableFuture : completableFutureList) {
DocumentImportEntity documentImportEntity = null;
try {
documentImportEntity = completableFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw new ArangoDBException(e);
}
documentImportEntityList.add(documentImportEntity);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hkernbach I noticed a small problem.
RIght before the return statement on this line
we need to shutdown the executorService.

        executorService.shutdown();

If another pull request is needed, please let me know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix that, thanks for spotting :)

return documentImportEntityList;
}

@Override
public DocumentImportEntity importDocuments(final String values) throws ArangoDBException {
return importDocuments(values, new DocumentImportOptions());
Expand Down
58 changes: 58 additions & 0 deletions src/test/java/com/arangodb/ArangoCollectionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1726,6 +1726,64 @@ public void importDocumentsJsonFromToPrefix() {
}
}

@Test
public void importDocumentsBatchSizeNumThreads() {
final Collection<BaseDocument> values = new ArrayList<BaseDocument>();
for (int i = 1; i <= 100; i++) {
values.add(new BaseDocument(String.valueOf(i)));
}
int batchSize = 5;
int numThreads = 8;
final Collection<DocumentImportEntity> docsList = db.collection(COLLECTION_NAME).importDocuments(values,
new DocumentImportOptions(), batchSize, numThreads);
assertThat(docsList.size(), is(values.size() / batchSize));
for (final DocumentImportEntity docs : docsList) {
assertThat(docs, is(notNullValue()));
assertThat(docs.getCreated(), is(batchSize));
assertThat(docs.getEmpty(), is(0));
assertThat(docs.getErrors(), is(0));
assertThat(docs.getIgnored(), is(0));
assertThat(docs.getUpdated(), is(0));
assertThat(docs.getDetails(), is(empty()));
}
}

@Test
public void importDocumentsBatchSizeNumThreadsIllegalBatchSize() {
final Collection<BaseDocument> values = new ArrayList<BaseDocument>();
for (int i = 1; i <= 10; i++) {
values.add(new BaseDocument(String.valueOf(i)));
}

int batchSize = 0;
int numThreads = 8;

try {
final Collection<DocumentImportEntity> docsList = db.collection(COLLECTION_NAME).importDocuments(values,
new DocumentImportOptions(), batchSize, numThreads);
fail();
} catch (IllegalArgumentException e) {
}
}

@Test
public void importDocumentsBatchSizeNumThreadsIllegalNumThreads() {
final Collection<BaseDocument> values = new ArrayList<BaseDocument>();
for (int i = 1; i <= 10; i++) {
values.add(new BaseDocument(String.valueOf(i)));
}

int batchSize = 5;
int numThreads = 0;

try {
final Collection<DocumentImportEntity> docsList = db.collection(COLLECTION_NAME).importDocuments(values,
new DocumentImportOptions(), batchSize, numThreads);
fail();
} catch (IllegalArgumentException e) {
}
}

@Test
public void deleteDocumentsByKey() {
final Collection<BaseDocument> values = new ArrayList<BaseDocument>();
Expand Down