Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
Fixes resume logic by loading then filtering first id
Browse files Browse the repository at this point in the history
  • Loading branch information
Todd Nine committed May 4, 2015
1 parent 294a7d9 commit 413f023
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 5 deletions.
Expand Up @@ -25,6 +25,7 @@
import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
import org.apache.usergrid.corepersistence.pipeline.read.Collector;
import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;

import com.google.common.base.Optional;
Expand Down Expand Up @@ -77,7 +78,7 @@ public Pipeline( final ApplicationScope applicationScope, final List<PipelineOpe
public Observable<R> execute(){


Observable traverseObservable = Observable.just( applicationScope.getApplication() );
Observable traverseObservable = Observable.just( new FilterResult<>( applicationScope.getApplication(), Optional.absent() ));

//build our traversal commands
for ( PipelineOperation pipelineOperation : idPipelineOperationList ) {
Expand Down
Expand Up @@ -20,6 +20,8 @@
package org.apache.usergrid.corepersistence.pipeline.read;


import org.apache.usergrid.corepersistence.pipeline.read.collect.EntityFilter;
import org.apache.usergrid.corepersistence.pipeline.read.collect.IdCursorSerializer;
import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter;
import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateIdFilter;
import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchCollectionFilter;
Expand Down Expand Up @@ -131,4 +133,11 @@ ElasticSearchConnectionFilter elasticSearchConnectionFilter( @Assisted( "query"
* @param entityId The entity id to emit
*/
EntityIdFilter getEntityIdFilter( final Id entityId );


/**
* Create a new instance of our entity filter
* @return
*/
EntityFilter entityFilter();
}
Expand Up @@ -26,9 +26,9 @@
import org.apache.usergrid.corepersistence.pipeline.Pipeline;
import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter;
import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityLoadFilter;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;

import com.google.common.base.Optional;
Expand Down Expand Up @@ -211,9 +211,14 @@ public Observable<ResultsPage> execute() {


//add our last filter that will generate entities
final Filter<?, Entity> finalFilter = collectorState.getFinalFilter();
final Filter<?, Entity> entityLoadFilter = collectorState.getFinalFilter();

filters.add( finalFilter );
filters.add( entityLoadFilter );

//add the filter that skips the first result on resume
final Filter<Entity, Entity> cursorEntityFilter = filterFactory.entityFilter();

filters.add( cursorEntityFilter );


//execute our collector
Expand Down
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.usergrid.corepersistence.pipeline.read.collect;


import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
import org.apache.usergrid.corepersistence.pipeline.read.Filter;
import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;

import com.google.common.base.Optional;

import rx.Observable;


/**
* A filter that is used when we can potentially serialize pages via cursor. This will filter the first result, only if
* it matches the Id that was set
*/
public class EntityFilter extends AbstractPathFilter<Entity, Entity, Id> implements Filter<Entity, Entity> {


@Override
public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Entity>> filterResultObservable ) {

//filter only the first id, then map into our path for our next pass


return filterResultObservable.skipWhile( filterResult -> {

final Optional<Id> startFromCursor = getSeekValue();

return startFromCursor.isPresent() && startFromCursor.get().equals( filterResult.getValue().getId() );
} ).map( filterResult -> {


final Entity entity = filterResult.getValue();
final Id entityId = entity.getId();

return createFilterResult( entity, entityId, filterResult.getPath() );
} );
}


@Override
protected CursorSerializer<Id> getCursorSerializer() {
return IdCursorSerializer.INSTANCE;
}
}
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.usergrid.corepersistence.pipeline.read.collect;


import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer;
import org.apache.usergrid.persistence.model.entity.Id;


/**
* cursor serializer for Ids
*/
public class IdCursorSerializer extends AbstractCursorSerializer<Id> {


public static final IdCursorSerializer INSTANCE = new IdCursorSerializer();

@Override
protected Class<Id> getType() {
return Id.class;
}


}
Expand Up @@ -73,7 +73,6 @@
},
"string": {
"type": "string",
"doc_values": true,
"norms": {
"enabled": false
},
Expand Down

0 comments on commit 413f023

Please sign in to comment.