Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
add batching
Browse files Browse the repository at this point in the history
  • Loading branch information
Shawn Feldman committed Feb 25, 2015
1 parent afd22eb commit 6276726
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 12 deletions.
Expand Up @@ -122,7 +122,7 @@ private void execute(final List<RequestBuilderContainer> containers) {
}

final AtomicBoolean isForceRefresh = new AtomicBoolean(config.isForcedRefresh());
//clear the queue or proceed to buffersize
//clear the queue or proceed to buffer size
Observable.from(containers)
.subscribeOn(Schedulers.io())
.flatMap(new Func1<RequestBuilderContainer, Observable<ShardReplicationOperationRequestBuilder>>() {
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.usergrid.persistence.core.future.BetterFuture;
import org.apache.usergrid.persistence.index.*;
import org.apache.usergrid.persistence.index.query.CandidateResult;
import org.apache.usergrid.persistence.index.utils.IndexValidationUtils;
Expand Down Expand Up @@ -95,18 +96,23 @@ public void testIndexThreads() throws IOException {

long now = System.currentTimeMillis();
final int threads = 1000;
final int size = 50;
final EntityIndex entityIndex = eif.createEntityIndex( applicationScope );
final IndexScope indexScope = new IndexScopeImpl(appId, "things");
final String entityType = "thing";
entityIndex.initializeIndex();
final CountDownLatch latch = new CountDownLatch(threads);
final AtomicLong failTime=new AtomicLong(0);
InputStream is = this.getClass().getResourceAsStream( "/sample-large.json" );
ObjectMapper mapper = new ObjectMapper();
final List<Object> sampleJson = mapper.readValue( is, new TypeReference<List<Object>>() {} );
for(int i=0;i<threads;i++) {
Thread thread = new Thread(new Runnable() {
public void run() {
try {
insertJsonBlob(entityIndex, entityType, indexScope, "/sample-small.json", 1, 0);
entityIndex.refresh();
EntityIndexBatch batch = entityIndex.createBatch();
insertJsonBlob(sampleJson,batch, entityType, indexScope, size, 0);
batch.execute().get();
} catch (Exception e) {
synchronized (failTime) {
if (failTime.get() == 0) {
Expand All @@ -127,7 +133,7 @@ public void run() {
}catch (InterruptedException ie){
throw new RuntimeException(ie);
}
assertTrue("system must have failed at "+(failTime.get() - now) ,failTime.get()==0);
assertTrue("system must have failed at " + (failTime.get() - now), failTime.get() == 0);
}

@Test
Expand Down Expand Up @@ -210,17 +216,20 @@ public void testDeleteByQueryWithAlias() throws IOException {
testQuery(indexScope, searchTypes, entityIndex, "name = 'Bowers Oneil'", 0);

}

private void insertJsonBlob(EntityIndex entityIndex, String entityType, IndexScope indexScope, String filePath,final int max,final int startIndex) throws IOException {
InputStream is = this.getClass().getResourceAsStream( filePath );
ObjectMapper mapper = new ObjectMapper();
List<Object> sampleJson = mapper.readValue( is, new TypeReference<List<Object>>() {} );
EntityIndexBatch batch = entityIndex.createBatch();
insertJsonBlob(sampleJson,batch, entityType, indexScope, max,startIndex);
batch.executeAndRefresh().get();
}

private void insertJsonBlob(List<Object> sampleJson, EntityIndexBatch batch, String entityType, IndexScope indexScope,final int max,final int startIndex) throws IOException {
int count = 0;
StopWatch timer = new StopWatch();
timer.start();

final EntityIndexBatch batch = entityIndex.createBatch();

if(startIndex > 0){
for(int i =0; i<startIndex;i++){
Expand All @@ -246,10 +255,9 @@ private void insertJsonBlob(EntityIndex entityIndex, String entityType, IndexSco
}
}

batch.executeAndRefresh().get();
timer.stop();
log.info( "Total time to index {} entries {}ms, average {}ms/entry",
new Object[] { count, timer.getTime(), timer.getTime() / count } );
log.info("Total time to index {} entries {}ms, average {}ms/entry",
new Object[]{count, timer.getTime(), timer.getTime() / count } );
}


Expand Down Expand Up @@ -428,7 +436,7 @@ public void getEntityVersions() throws Exception {
user.setField( new StringField( "address2", "apt 508" ) );
batch.index( indexScope, user );
user.setField( new StringField( "address3", "apt 508" ) );
batch.index( indexScope, user );
batch.index( indexScope, user);
batch.executeAndRefresh().get();

CandidateResults results = entityIndex.getEntityVersions(indexScope, user.getId() );
Expand Down Expand Up @@ -467,7 +475,7 @@ public void deleteVerification() throws Throwable {

EntityIndexBatch batch = ei.createBatch();

batch.index( appScope, user );
batch.index( appScope, user);
batch.executeAndRefresh().get();

Query query = new Query();
Expand Down Expand Up @@ -534,7 +542,7 @@ public void multiValuedTypes() {
Entity fred = EntityIndexMapUtils.fromMap( fredMap );
EntityUtils.setId( fred, new SimpleId( UUIDGenerator.newTimeUUID(), "user" ) );
EntityUtils.setVersion( fred, UUIDGenerator.newTimeUUID() );
batch.index( appScope, fred );
batch.index( appScope, fred);

batch.executeAndRefresh().get();

Expand Down

0 comments on commit 6276726

Please sign in to comment.