From 6276726cb20c8dd581b49a453b244dd38bc6b68c Mon Sep 17 00:00:00 2001 From: Shawn Feldman Date: Wed, 25 Feb 2015 10:03:28 -0700 Subject: [PATCH] add batching --- .../index/impl/IndexBatchBufferImpl.java | 2 +- .../index/impl/EntityIndexTest.java | 30 ++++++++++++------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java index b5d952865b..73af70fdf8 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java @@ -122,7 +122,7 @@ private void execute(final List containers) { } final AtomicBoolean isForceRefresh = new AtomicBoolean(config.isForcedRefresh()); - //clear the queue or proceed to buffersize + //clear the queue or proceed to buffer size Observable.from(containers) .subscribeOn(Schedulers.io()) .flatMap(new Func1>() { diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java index e820ce1885..918ad350a0 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; +import org.apache.usergrid.persistence.core.future.BetterFuture; import org.apache.usergrid.persistence.index.*; import org.apache.usergrid.persistence.index.query.CandidateResult; import org.apache.usergrid.persistence.index.utils.IndexValidationUtils; @@ -95,18 +96,23 @@ public void testIndexThreads() throws IOException { long now = System.currentTimeMillis(); final int threads = 1000; + final int size = 50; final EntityIndex entityIndex = eif.createEntityIndex( applicationScope ); final IndexScope indexScope = new IndexScopeImpl(appId, "things"); final String entityType = "thing"; entityIndex.initializeIndex(); final CountDownLatch latch = new CountDownLatch(threads); final AtomicLong failTime=new AtomicLong(0); + InputStream is = this.getClass().getResourceAsStream( "/sample-large.json" ); + ObjectMapper mapper = new ObjectMapper(); + final List sampleJson = mapper.readValue( is, new TypeReference>() {} ); for(int i=0;i sampleJson = mapper.readValue( is, new TypeReference>() {} ); + EntityIndexBatch batch = entityIndex.createBatch(); + insertJsonBlob(sampleJson,batch, entityType, indexScope, max,startIndex); + batch.executeAndRefresh().get(); + } + private void insertJsonBlob(List sampleJson, EntityIndexBatch batch, String entityType, IndexScope indexScope,final int max,final int startIndex) throws IOException { int count = 0; StopWatch timer = new StopWatch(); timer.start(); - final EntityIndexBatch batch = entityIndex.createBatch(); if(startIndex > 0){ for(int i =0; i