Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
Finishes testing of connections
Browse files Browse the repository at this point in the history
  • Loading branch information
Todd Nine committed May 21, 2015
1 parent aa9153a commit b59abac
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 25 deletions.
Expand Up @@ -37,7 +37,6 @@
import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor;
import org.apache.usergrid.corepersistence.results.EntityQueryExecutor;
import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor;
import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.ConnectedEntityRef;
Expand Down Expand Up @@ -268,7 +267,7 @@ public boolean isConnectionMember( String connectionType, EntityRef entity ) thr
GraphManager gm = managerCache.getGraphManager( applicationScope );
Observable<Edge> edges = gm.loadEdgeVersions(
new SimpleSearchByEdge( new SimpleId( headEntity.getUuid(), headEntity.getType() ), edgeType, entityId,
Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null ) );
Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() ) );

return edges.toBlocking().firstOrDefault( null ) != null;
}
Expand Down Expand Up @@ -933,37 +932,28 @@ public Results searchTargetEntities( Query query ) throws Exception {
cpHeadEntity.getId() );




if(query.getResultsLevel() == Level.REFS){
if ( query.getResultsLevel() == Level.REFS || query.getResultsLevel() == Level.IDS ) {

final IdBuilder traversedIds;
if(query.isGraphSearch()){

traversedIds = pipelineBuilder.traverseConnection( connection, entityType );


if ( query.isGraphSearch() ) {
traversedIds = pipelineBuilder.traverseConnection( connection, entityType );
}
else
{
traversedIds = pipelineBuilder.searchConnection( connection, query.getQl().get(), entityType ).loadIds();

else {
traversedIds =
pipelineBuilder.searchConnection( connection, query.getQl().get(), entityType ).loadIds();
}

final Observable<ResultsPage<ConnectionRef>> results = traversedIds.loadConnectionRefs(
cpHeadEntity.getId(), connection ).build();
//create connection refs

return new ConnectionRefQueryExecutor( results ).next();
final Observable<ResultsPage<ConnectionRef>> results =
traversedIds.loadConnectionRefs( cpHeadEntity.getId(), connection ).build();

return new ConnectionRefQueryExecutor( results ).next();
}



if(query.getResultsLevel() == Level.IDS){

throw new UnsupportedOperationException( "Not yet implemented" );
}


//we want to load all entities

Expand Down
Expand Up @@ -24,15 +24,20 @@
import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
import org.apache.usergrid.corepersistence.pipeline.Pipeline;
import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefFilter;
import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefResumeFilter;
import org.apache.usergrid.corepersistence.pipeline.read.collect.IdResumeFilter;
import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector;
import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate;
import org.apache.usergrid.persistence.ConnectionRef;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;

import com.google.common.base.Optional;

import rx.Observable;


/**
* A builder to transition from emitting Ids in the pipeline into other operations
Expand Down
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.usergrid.corepersistence.pipeline.read.collect;


import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;

import com.google.common.base.Optional;

import rx.Observable;


/**
* A filter that is used when we can potentially serialize pages via cursor. This will filter the first result, only if
* it matches the Id that was set
*/
public class IdResumeFilter extends AbstractPathFilter<Id, Id, Id> {


@Override
public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterResultObservable ) {

//filter only the first id, then map into our path for our next pass


//skip our first and emit if neccessary
return filterResultObservable.skipWhile( filterResult -> {

final Optional<Id> startFromCursor = getSeekValue();

return startFromCursor.isPresent() && startFromCursor.get().equals( filterResult.getValue() );
} );
}


@Override
protected CursorSerializer<Id> getCursorSerializer() {
return IdCursorSerializer.INSTANCE;
}
}
Expand Up @@ -36,7 +36,7 @@


/**
* Processes our results of entities and turns them into
* Processes our results of connection refs
*/
@Deprecated//Required for 1.0 compatibility
public class ConnectionRefQueryExecutor extends ObservableQueryExecutor<ConnectionRef> {
Expand Down
Expand Up @@ -35,7 +35,7 @@


/**
* Processes our results of entities and turns them into
* Processes our results of entities
*/
@Deprecated//Required for 1.0 compatibility
public class EntityQueryExecutor extends ObservableQueryExecutor<Entity> {
Expand Down
Expand Up @@ -60,7 +60,7 @@ public ObservableQueryExecutor( final Observable<ResultsPage<T>> resultsObservab
* @param resultsPage
* @return
*/
protected abstract Results createResults( final ResultsPage resultsPage );
protected abstract Results createResults( final ResultsPage<T> resultsPage );



Expand All @@ -69,7 +69,7 @@ public ObservableQueryExecutor( final Observable<ResultsPage<T>> resultsObservab
* @param resultsPage
* @return
*/
private Results createResultsInternal( final ResultsPage resultsPage ) {
private Results createResultsInternal( final ResultsPage<T> resultsPage ) {


final Results results = createResults( resultsPage );
Expand Down

0 comments on commit b59abac

Please sign in to comment.