diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index be143ce950..c766a1b656 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -37,7 +37,6 @@ import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor; import org.apache.usergrid.corepersistence.results.EntityQueryExecutor; -import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.ConnectedEntityRef; @@ -268,7 +267,7 @@ public boolean isConnectionMember( String connectionType, EntityRef entity ) thr GraphManager gm = managerCache.getGraphManager( applicationScope ); Observable edges = gm.loadEdgeVersions( new SimpleSearchByEdge( new SimpleId( headEntity.getUuid(), headEntity.getType() ), edgeType, entityId, - Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null ) ); + Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() ) ); return edges.toBlocking().firstOrDefault( null ) != null; } @@ -933,37 +932,28 @@ public Results searchTargetEntities( Query query ) throws Exception { cpHeadEntity.getId() ); - - - if(query.getResultsLevel() == Level.REFS){ + if ( query.getResultsLevel() == Level.REFS || query.getResultsLevel() == Level.IDS ) { final IdBuilder traversedIds; - if(query.isGraphSearch()){ - - traversedIds = pipelineBuilder.traverseConnection( connection, entityType ); - + if ( query.isGraphSearch() ) { + traversedIds = pipelineBuilder.traverseConnection( connection, entityType ); } - else - { - traversedIds = pipelineBuilder.searchConnection( connection, query.getQl().get(), entityType ).loadIds(); - + else { + traversedIds = + pipelineBuilder.searchConnection( connection, query.getQl().get(), entityType ).loadIds(); } - final Observable> results = traversedIds.loadConnectionRefs( - cpHeadEntity.getId(), connection ).build(); + //create connection refs - return new ConnectionRefQueryExecutor( results ).next(); + final Observable> results = + traversedIds.loadConnectionRefs( cpHeadEntity.getId(), connection ).build(); + return new ConnectionRefQueryExecutor( results ).next(); } - if(query.getResultsLevel() == Level.IDS){ - - throw new UnsupportedOperationException( "Not yet implemented" ); - } - //we want to load all entities diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java index 4291ea9cc6..0f784a6c75 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java @@ -24,8 +24,11 @@ import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory; import org.apache.usergrid.corepersistence.pipeline.Pipeline; import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefFilter; import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefResumeFilter; +import org.apache.usergrid.corepersistence.pipeline.read.collect.IdResumeFilter; +import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector; import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate; import org.apache.usergrid.persistence.ConnectionRef; import org.apache.usergrid.persistence.model.entity.Entity; @@ -33,6 +36,8 @@ import com.google.common.base.Optional; +import rx.Observable; + /** * A builder to transition from emitting Ids in the pipeline into other operations diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdResumeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdResumeFilter.java new file mode 100644 index 0000000000..e9fd8dee87 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdResumeFilter.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.collect; + + +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; +import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; + +import rx.Observable; + + +/** + * A filter that is used when we can potentially serialize pages via cursor. This will filter the first result, only if + * it matches the Id that was set + */ +public class IdResumeFilter extends AbstractPathFilter { + + + @Override + public Observable> call( final Observable> filterResultObservable ) { + + //filter only the first id, then map into our path for our next pass + + + //skip our first and emit if neccessary + return filterResultObservable.skipWhile( filterResult -> { + + final Optional startFromCursor = getSeekValue(); + + return startFromCursor.isPresent() && startFromCursor.get().equals( filterResult.getValue() ); + } ); + } + + + @Override + protected CursorSerializer getCursorSerializer() { + return IdCursorSerializer.INSTANCE; + } +} diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java index 798c9c75d2..3dfd98a3c5 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java @@ -36,7 +36,7 @@ /** - * Processes our results of entities and turns them into + * Processes our results of connection refs */ @Deprecated//Required for 1.0 compatibility public class ConnectionRefQueryExecutor extends ObservableQueryExecutor { diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java index bc9001e144..0e18e31365 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java @@ -35,7 +35,7 @@ /** - * Processes our results of entities and turns them into + * Processes our results of entities */ @Deprecated//Required for 1.0 compatibility public class EntityQueryExecutor extends ObservableQueryExecutor { diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java index ff44416242..fce1fb26dc 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java @@ -60,7 +60,7 @@ public ObservableQueryExecutor( final Observable> resultsObservab * @param resultsPage * @return */ - protected abstract Results createResults( final ResultsPage resultsPage ); + protected abstract Results createResults( final ResultsPage resultsPage ); @@ -69,7 +69,7 @@ public ObservableQueryExecutor( final Observable> resultsObservab * @param resultsPage * @return */ - private Results createResultsInternal( final ResultsPage resultsPage ) { + private Results createResultsInternal( final ResultsPage resultsPage ) { final Results results = createResults( resultsPage );