Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
Merge branch 'two-dot-o-dev' into USERGRID-593
Browse files Browse the repository at this point in the history
  • Loading branch information
Todd Nine committed Apr 24, 2015
2 parents 83174e8 + 11e40ce commit 60617d2
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public Entity writeEntity( Map<String, Object> entity ) throws Exception {

@Override
public Results getResults( Query query ) throws Exception {
app.refreshIndex();
return app.getEntityManager().searchCollection( app.getEntityManager().getApplicationRef(), "tests", query );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public void singleOrderByMaxLimit( IoHelper io ) throws Exception {

io.doSetup();

int size = 200;
int size = 20;
int queryLimit = Query.MAX_LIMIT;

long start = System.currentTimeMillis();
Expand Down Expand Up @@ -336,7 +336,7 @@ protected void singleOrderByIntersection( IoHelper io ) throws Exception {

io.doSetup();

int size = 700;
int size = 70;
int queryLimit = Query.MAX_LIMIT;

// the number of entities that should be written including an intersection
Expand Down Expand Up @@ -366,14 +366,12 @@ protected void singleOrderByIntersection( IoHelper io ) throws Exception {
}
}
app.refreshIndex();
Thread.sleep(500);


long stop = System.currentTimeMillis();

LOG.info( "Writes took {} ms", stop - start );

Query query = Query.fromQL( "select * where intersect = true sort by created" );
Query query = Query.fromQL( "select * where intersect = true sort by created asc" );
query.setLimit( queryLimit );

int count = 0;
Expand All @@ -387,7 +385,7 @@ protected void singleOrderByIntersection( IoHelper io ) throws Exception {
// now do simple ordering, should be returned in order
results = io.getResults( query );

for ( int i = 0; i < results.size(); i++ ) {
for ( int i = 0 ; i< results.size(); i++) {
assertEquals( expected.get( count ), results.getEntities().get( i ).getName() );
count++;
}
Expand All @@ -406,7 +404,7 @@ protected void singleOrderByIntersection( IoHelper io ) throws Exception {

protected void singleOrderByComplexIntersection( IoHelper io ) throws Exception {

int size = 200;
int size = 20;
int queryLimit = Query.MAX_LIMIT;

// the number of entities that should be written including an intersection
Expand Down Expand Up @@ -477,7 +475,7 @@ protected void singleOrderByComplexIntersection( IoHelper io ) throws Exception
protected void singleOrderByNoIntersection( IoHelper io ) throws Exception {
io.doSetup();

int size = 200;
int size = 20;
int queryLimit = Query.MAX_LIMIT;

// the number of entities that should be written including an intersection
Expand Down Expand Up @@ -521,7 +519,7 @@ protected void singleOrderByComplexUnion( IoHelper io ) throws Exception {

io.doSetup();

int size = 200;
int size = 20;
int queryLimit = Query.MAX_LIMIT;

// the number of entities that should be written including an intersection
Expand Down Expand Up @@ -591,7 +589,7 @@ protected void singleOrderByNot( IoHelper io ) throws Exception {

io.doSetup();

int size = 200;
int size = 20;
int queryLimit = Query.MAX_LIMIT;

// the number of entities that should be written including an intersection
Expand Down Expand Up @@ -691,7 +689,7 @@ protected void singleOrderByLessThanLimit( IoHelper io ) throws Exception {

LOG.info( "Writes took {} ms", stop - start );

Query query = Query.fromQL( "select * where searched = true sort by created" );
Query query = Query.fromQL( "select * where searched = true order by created" );
query.setLimit( queryLimit );

int count = 0;
Expand Down Expand Up @@ -1106,7 +1104,6 @@ protected void allIn( IoHelper io ) throws Exception {
LOG.info( "Writes took {} ms", stop - start );

app.refreshIndex();
Thread.sleep(500);

Query query = new Query();
query.setLimit( 10 );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,15 +351,15 @@ private void doInRetry( final RetryOperation operation ) {
}
catch ( Exception e ) {
logger.error( "Unable to execute operation, retrying", e );
try {
Thread.sleep( WAIT_TIME );
} catch ( InterruptedException ie ) {
//swallow it
}
}


try {
Thread.sleep( WAIT_TIME );
}
catch ( InterruptedException e ) {
//swallow it
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public BetterFuture put( IndexOperationMessage message ) {
private void startSubscription() {


final Observable<IndexOperationMessage> observable = Observable.create( bufferProducer );
final Observable<IndexOperationMessage> observable = Observable.create(bufferProducer);

//buffer on our new thread with a timeout
observable.buffer( indexFig.getIndexBufferSize(), indexFig.getIndexBufferTimeout(), TimeUnit.MILLISECONDS,
Expand Down Expand Up @@ -189,11 +189,23 @@ private Observable<IndexOperationMessage> processBatch( final List<IndexOperatio

//now that we've processed them all, ack the futures after our last batch comes through
final Observable<IndexOperationMessage> processedIndexOperations =
requests.last().flatMap( lastRequest -> Observable.from( batches ) );
requests.lastOrDefault(null).flatMap( lastRequest ->{
if(lastRequest!=null){
return Observable.from( batches ) ;
}else{
return Observable.empty();
}
});

//subscribe to the operations that generate requests on a new thread so that we can execute them quickly
//mark this as done
return processedIndexOperations.doOnNext( processedIndexOp -> processedIndexOp.done() ).doOnError( t -> log.error( "Unable to ack futures", t ) );
return processedIndexOperations.doOnNext( processedIndexOp ->
{
processedIndexOp.done();
}
).doOnError(t -> {
log.error("Unable to ack futures", t);
});
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public IndexRefreshCommandImpl( IndexIdentifier indexIdentifier, EsProvider esPr

@Override
public Observable<IndexRefreshCommandInfo> execute() {
final long start = System.currentTimeMillis();

Timer.Context refreshTimer = timer.time();
//id to hunt for
Expand Down Expand Up @@ -124,7 +125,6 @@ public Observable<IndexRefreshCommandInfo> execute() {

//start our processing immediately
final Observable<IndexRefreshCommandInfo> future = Async.toAsync( () -> {
long start = System.currentTimeMillis();
IndexRefreshCommandInfo info;
try {
for ( int i = 0; i < indexFig.maxRefreshSearches(); i++ ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,6 @@ public class EntityIndexTest extends BaseIT {
@Inject
public EntityIndex ei;

@Inject
@Rule
public MigrationManagerRule migrationManagerRule;

@Inject
@Rule
public ElasticSearchRule elasticSearchRule;
Expand All @@ -111,7 +107,6 @@ public void testIndex() throws IOException, InterruptedException {

insertJsonBlob( entityIndex, entityType, searchEdge, "/sample-large.json", 101, 0 );

ei.refreshAsync().toBlocking().last();

testQueries( searchEdge, searchTypes, entityIndex );
}
Expand Down Expand Up @@ -257,6 +252,7 @@ public void testMultipleIndexInitializations() {

@Test
public void testAddMultipleIndexes() throws IOException {

Id appId = new SimpleId( "application" );

ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
Expand All @@ -266,24 +262,23 @@ public void testAddMultipleIndexes() throws IOException {

final String entityType = "thing";
IndexEdge searchEdge = new IndexEdgeImpl( appId, "things", SearchEdge.NodeType.SOURCE, 10 );
final SearchTypes searchTypes = SearchTypes.fromTypes( entityType );
final SearchTypes searchTypes = SearchTypes.fromTypes(entityType);

insertJsonBlob( entityIndex, entityType, searchEdge, "/sample-large.json", 101, 0 );
insertJsonBlob(entityIndex, entityType, searchEdge, "/sample-large.json", 101, 0);

ei.refreshAsync().toBlocking().last();

testQueries( searchEdge, searchTypes, entityIndex );

ei.addIndex( "v2", 1, 0, "one" );
testQueries(searchEdge, searchTypes, entityIndex);

insertJsonBlob( entityIndex, entityType, searchEdge, "/sample-large.json", 101, 100 );
ei.addIndex("v2", 1, 0, "one");

ei.refreshAsync().toBlocking().last();
insertJsonBlob(entityIndex, entityType, searchEdge, "/sample-large.json", 101, 100);

//Hilda Youn
testQuery( searchEdge, searchTypes, entityIndex, "name = 'Hilda Young'", 1 );

testQuery( searchEdge, searchTypes, entityIndex, "name = 'Lowe Kelley'", 1 );

log.info("hi");
}


Expand All @@ -302,13 +297,11 @@ public void testDeleteWithAlias() throws IOException {

insertJsonBlob( entityIndex, entityType, searchEdge, "/sample-large.json", 1, 0 );

ei.refreshAsync().toBlocking().last();

ei.addIndex( "v2", 1, 0, "one" );

insertJsonBlob( entityIndex, entityType, searchEdge, "/sample-large.json", 1, 0 );

ei.refreshAsync().toBlocking().last();
CandidateResults crs = testQuery( searchEdge, searchTypes, entityIndex, "name = 'Bowers Oneil'", 2 );

EntityIndexBatch entityIndexBatch = entityIndex.createBatch();
Expand All @@ -330,7 +323,9 @@ private void insertJsonBlob( ApplicationEntityIndex entityIndex, String entityTy
EntityIndexBatch batch = entityIndex.createBatch();
insertJsonBlob( sampleJson, batch, entityType, indexEdge, max, startIndex );
batch.execute().get();
ei.refreshAsync().toBlocking().last();
IndexRefreshCommandImpl.IndexRefreshCommandInfo info = ei.refreshAsync().toBlocking().last();
long time = info.getExecutionTime();
log.info("refresh took ms:"+time);
}


Expand All @@ -356,8 +351,6 @@ private void insertJsonBlob( List<Object> sampleJson, EntityIndexBatch batch, St
EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
entity.setField( new UUIDField( IndexingUtils.ENTITY_ID_FIELDNAME, UUID.randomUUID() ) );
batch.index( indexEdge, entity );
batch.execute().get();


if ( ++count > max ) {
break;
Expand Down Expand Up @@ -418,8 +411,7 @@ private CandidateResults testQuery( final SearchEdge scope, final SearchTypes se

StopWatch timer = new StopWatch();
timer.start();
CandidateResults candidateResults = null;
candidateResults = entityIndex.search( scope, searchTypes, queryString, num + 1 );
CandidateResults candidateResults = entityIndex.search( scope, searchTypes, queryString, num + 1 );

timer.stop();

Expand Down Expand Up @@ -626,14 +618,9 @@ public void multiValuedTypes() {
@Test
public void healthTest() {
Id appId = new SimpleId( "entityindextest" );
Id ownerId = new SimpleId( "multivaluedtype" );
ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
assertNotEquals( "cluster should be ok", Health.RED, ei.getClusterHealth() );
assertEquals( "index should be ready", Health.GREEN, ei.getIndexHealth() );
ApplicationEntityIndex entityIndex = eif.createApplicationEntityIndex( applicationScope );

ei.refreshAsync().toBlocking().last();

assertNotEquals( "cluster should be fine", Health.RED, ei.getIndexHealth() );
assertNotEquals( "cluster should be ready now", Health.RED, ei.getClusterHealth() );
}
Expand Down

0 comments on commit 60617d2

Please sign in to comment.