From 7aeb8714b01975a860d01967cac65eebef074386 Mon Sep 17 00:00:00 2001 From: Razi Khaja Date: Tue, 16 Jul 2019 13:56:35 -0400 Subject: [PATCH 1/6] Defined new interface method and Javadoc for importDocuments that accepts two additional parameters batchSize and numThreads. --- .../java/com/arangodb/ArangoCollection.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/main/java/com/arangodb/ArangoCollection.java b/src/main/java/com/arangodb/ArangoCollection.java index 0f0dd8c10..b00f8b6dc 100644 --- a/src/main/java/com/arangodb/ArangoCollection.java +++ b/src/main/java/com/arangodb/ArangoCollection.java @@ -150,6 +150,23 @@ MultiDocumentEntity> insertDocuments( */ DocumentImportEntity importDocuments(Collection values, DocumentImportOptions options) throws ArangoDBException; + /** + * Bulk imports the given values into the collection. + * + * @param values + * a list of Objects that will be stored as documents + * @param options + * Additional options, can be null + * @param batchSize + * Size for individual data batches of the original collection of values + * @param numThreads + * Number of parallel import threads + * @return list of information about the imported batches + * @throws ArangoDBException + */ + Collection importDocuments(Collection values, DocumentImportOptions options, + int batchSize, int numThreads) throws ArangoDBException; + /** * Bulk imports the given values into the collection. * From e4d06b23bbd89b6d1468eca8f0f5b45d603e7924 Mon Sep 17 00:00:00 2001 From: Razi Khaja Date: Tue, 16 Jul 2019 14:52:26 -0400 Subject: [PATCH 2/6] Added dependency commons-collections4 so that we can use ListUtils.partition. --- pom.xml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pom.xml b/pom.xml index 3d6066f59..92cc48a5a 100644 --- a/pom.xml +++ b/pom.xml @@ -230,6 +230,10 @@ httpcore provided + + org.apache.commons + commons-collections4 + commons-logging commons-logging @@ -275,6 +279,11 @@ httpcore 4.4.11 + + org.apache.commons + commons-collections4 + 4.4 + commons-codec commons-codec From 724381a8f9a8b54f627b2a1ff87a573947d8060c Mon Sep 17 00:00:00 2001 From: Razi Khaja Date: Tue, 16 Jul 2019 14:56:36 -0400 Subject: [PATCH 3/6] Implemented new importDocuments method. --- .../internal/ArangoCollectionImpl.java | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/src/main/java/com/arangodb/internal/ArangoCollectionImpl.java b/src/main/java/com/arangodb/internal/ArangoCollectionImpl.java index fb3f78154..4fda706c6 100644 --- a/src/main/java/com/arangodb/internal/ArangoCollectionImpl.java +++ b/src/main/java/com/arangodb/internal/ArangoCollectionImpl.java @@ -20,8 +20,15 @@ package com.arangodb.internal; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.commons.collections4.ListUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,6 +112,32 @@ public DocumentImportEntity importDocuments(final Collection values, final Do return executor.execute(importDocumentsRequest(values, options), DocumentImportEntity.class); } + @Override + public Collection importDocuments(Collection values, DocumentImportOptions options, + int batchSize, int numThreads) throws ArangoDBException { + List> batches = ListUtils.partition(new ArrayList<>(values), batchSize); + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + List> completableFutureList = new ArrayList<>(); + for (List batch : batches) { + CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { + DocumentImportEntity documentImportEntity = importDocuments(batch, options); + return documentImportEntity; + }, executorService); + completableFutureList.add(completableFuture); + } + List documentImportEntityList = new ArrayList<>(); + for (CompletableFuture completableFuture : completableFutureList) { + DocumentImportEntity documentImportEntity = null; + try { + documentImportEntity = completableFuture.get(); + } catch (InterruptedException | ExecutionException e) { + throw new ArangoDBException(e); + } + documentImportEntityList.add(documentImportEntity); + } + return documentImportEntityList; + } + @Override public DocumentImportEntity importDocuments(final String values) throws ArangoDBException { return importDocuments(values, new DocumentImportOptions()); From 66eb903a01c96c5babd1b8eee0a342763778b1cc Mon Sep 17 00:00:00 2001 From: Razi Khaja Date: Tue, 23 Jul 2019 12:52:54 -0400 Subject: [PATCH 4/6] Added test for importDocuments method with batchSize and numThreads. --- .../com/arangodb/ArangoCollectionTest.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/test/java/com/arangodb/ArangoCollectionTest.java b/src/test/java/com/arangodb/ArangoCollectionTest.java index c19c63473..ca7788bd9 100644 --- a/src/test/java/com/arangodb/ArangoCollectionTest.java +++ b/src/test/java/com/arangodb/ArangoCollectionTest.java @@ -1726,6 +1726,25 @@ public void importDocumentsJsonFromToPrefix() { } } + @Test + public void importDocumentsBatchSizeNumThreads() { + final Collection values = new ArrayList(); + for( int i = 1; i <= 100; i++) { + values.add(new BaseDocument(String.valueOf(i))); + } + final Collection docsList = db.collection(COLLECTION_NAME).importDocuments(values, + new DocumentImportOptions(),10, 8); + for (final DocumentImportEntity docs : docsList) { + assertThat(docs, is(notNullValue())); + assertThat(docs.getCreated(), is(10)); + assertThat(docs.getEmpty(), is(0)); + assertThat(docs.getErrors(), is(0)); + assertThat(docs.getIgnored(), is(0)); + assertThat(docs.getUpdated(), is(0)); + assertThat(docs.getDetails(), is(empty())); + } + } + @Test public void deleteDocumentsByKey() { final Collection values = new ArrayList(); From 73582c6c940ff6a8343e0e77ff3876d3ef4a041a Mon Sep 17 00:00:00 2001 From: Razi Khaja Date: Tue, 23 Jul 2019 13:13:21 -0400 Subject: [PATCH 5/6] Minor refactoring of test for importDocuments method with batchSize and numThreads. --- src/test/java/com/arangodb/ArangoCollectionTest.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/test/java/com/arangodb/ArangoCollectionTest.java b/src/test/java/com/arangodb/ArangoCollectionTest.java index ca7788bd9..f52654b9d 100644 --- a/src/test/java/com/arangodb/ArangoCollectionTest.java +++ b/src/test/java/com/arangodb/ArangoCollectionTest.java @@ -1729,14 +1729,17 @@ public void importDocumentsJsonFromToPrefix() { @Test public void importDocumentsBatchSizeNumThreads() { final Collection values = new ArrayList(); - for( int i = 1; i <= 100; i++) { + for (int i = 1; i <= 100; i++) { values.add(new BaseDocument(String.valueOf(i))); } + int batchSize = 5; + int numThreads = 8; final Collection docsList = db.collection(COLLECTION_NAME).importDocuments(values, - new DocumentImportOptions(),10, 8); + new DocumentImportOptions(), batchSize, numThreads); + assertThat(docsList.size(), is(values.size() / batchSize)); for (final DocumentImportEntity docs : docsList) { assertThat(docs, is(notNullValue())); - assertThat(docs.getCreated(), is(10)); + assertThat(docs.getCreated(), is(batchSize)); assertThat(docs.getEmpty(), is(0)); assertThat(docs.getErrors(), is(0)); assertThat(docs.getIgnored(), is(0)); @@ -1745,6 +1748,7 @@ public void importDocumentsBatchSizeNumThreads() { } } + @Test public void deleteDocumentsByKey() { final Collection values = new ArrayList(); From c45b7e2cf534b96869703097d90924f724d48e4b Mon Sep 17 00:00:00 2001 From: hkernbach Date: Wed, 24 Jul 2019 16:39:33 +0200 Subject: [PATCH 6/6] added two false positive tests --- .../com/arangodb/ArangoCollectionTest.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/src/test/java/com/arangodb/ArangoCollectionTest.java b/src/test/java/com/arangodb/ArangoCollectionTest.java index f52654b9d..502a9b7c6 100644 --- a/src/test/java/com/arangodb/ArangoCollectionTest.java +++ b/src/test/java/com/arangodb/ArangoCollectionTest.java @@ -1748,6 +1748,41 @@ public void importDocumentsBatchSizeNumThreads() { } } + @Test + public void importDocumentsBatchSizeNumThreadsIllegalBatchSize() { + final Collection values = new ArrayList(); + for (int i = 1; i <= 10; i++) { + values.add(new BaseDocument(String.valueOf(i))); + } + + int batchSize = 0; + int numThreads = 8; + + try { + final Collection docsList = db.collection(COLLECTION_NAME).importDocuments(values, + new DocumentImportOptions(), batchSize, numThreads); + fail(); + } catch (IllegalArgumentException e) { + } + } + + @Test + public void importDocumentsBatchSizeNumThreadsIllegalNumThreads() { + final Collection values = new ArrayList(); + for (int i = 1; i <= 10; i++) { + values.add(new BaseDocument(String.valueOf(i))); + } + + int batchSize = 5; + int numThreads = 0; + + try { + final Collection docsList = db.collection(COLLECTION_NAME).importDocuments(values, + new DocumentImportOptions(), batchSize, numThreads); + fail(); + } catch (IllegalArgumentException e) { + } + } @Test public void deleteDocumentsByKey() {