Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Fixes wiring and updates code to be compatible with 2.0-dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Todd Nine committed Apr 24, 2015
1 parent ec3a710 commit 83174e8
Show file tree
Hide file tree
Showing 11 changed files with 99 additions and 127 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.usergrid.persistence.cassandra.CounterUtils; import org.apache.usergrid.persistence.cassandra.CounterUtils;
import org.apache.usergrid.persistence.cassandra.Setup; import org.apache.usergrid.persistence.cassandra.Setup;
import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider; import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
Expand All @@ -61,6 +62,7 @@
import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException; import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.index.ApplicationEntityIndex; import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
Expand Down Expand Up @@ -116,6 +118,8 @@ public EntityManager load(UUID appId) { // no checked exception
private final ApplicationIdCache applicationIdCache; private final ApplicationIdCache applicationIdCache;


private ManagerCache managerCache; private ManagerCache managerCache;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final GraphManagerFactory graphManagerFactory;


private CassandraService cassandraService; private CassandraService cassandraService;
private CounterUtils counterUtils; private CounterUtils counterUtils;
Expand All @@ -125,7 +129,7 @@ public EntityManager load(UUID appId) { // no checked exception
private final AsyncIndexService indexService; private final AsyncIndexService indexService;


public CpEntityManagerFactory( final CassandraService cassandraService, final CounterUtils counterUtils, public CpEntityManagerFactory( final CassandraService cassandraService, final CounterUtils counterUtils,
final Injector injector) { final Injector injector ) {


this.cassandraService = cassandraService; this.cassandraService = cassandraService;
this.counterUtils = counterUtils; this.counterUtils = counterUtils;
Expand All @@ -137,6 +141,9 @@ public CpEntityManagerFactory( final CassandraService cassandraService, final Co
this.indexService = injector.getInstance( AsyncIndexService.class ); this.indexService = injector.getInstance( AsyncIndexService.class );
this.applicationIdCache = injector.getInstance(ApplicationIdCacheFactory.class).getInstance( this.applicationIdCache = injector.getInstance(ApplicationIdCacheFactory.class).getInstance(
getManagementEntityManager() ); getManagementEntityManager() );
this.entityCollectionManagerFactory = injector.getInstance( EntityCollectionManagerFactory.class );
this.graphManagerFactory = injector.getInstance( GraphManagerFactory.class );



} }


Expand Down Expand Up @@ -192,8 +199,8 @@ public EntityManager getEntityManager(UUID applicationId) {




private EntityManager _getEntityManager( UUID applicationId ) { private EntityManager _getEntityManager( UUID applicationId ) {
EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache, metricsFactory, applicationId, EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache, metricsFactory,
entityCollectionManagerFactory, graphManagerFactory ); entityCollectionManagerFactory, graphManagerFactory, applicationId );
return em; return em;
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import org.springframework.util.Assert; import org.springframework.util.Assert;


import org.apache.usergrid.corepersistence.index.AsyncIndexService; import org.apache.usergrid.corepersistence.index.AsyncIndexService;
import org.apache.usergrid.corepersistence.results.CollectionGraphQueryExecutor;
import org.apache.usergrid.corepersistence.results.CollectionResultsLoaderFactoryImpl; import org.apache.usergrid.corepersistence.results.CollectionResultsLoaderFactoryImpl;
import org.apache.usergrid.corepersistence.results.ConnectionGraphQueryExecutor;
import org.apache.usergrid.corepersistence.results.ConnectionResultsLoaderFactoryImpl; import org.apache.usergrid.corepersistence.results.ConnectionResultsLoaderFactoryImpl;
import org.apache.usergrid.corepersistence.results.ElasticSearchQueryExecutor; import org.apache.usergrid.corepersistence.results.ElasticSearchQueryExecutor;
import org.apache.usergrid.corepersistence.results.QueryExecutor; import org.apache.usergrid.corepersistence.results.QueryExecutor;
Expand All @@ -51,7 +53,6 @@
import org.apache.usergrid.persistence.SimpleEntityRef; import org.apache.usergrid.persistence.SimpleEntityRef;
import org.apache.usergrid.persistence.SimpleRoleRef; import org.apache.usergrid.persistence.SimpleRoleRef;
import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl; import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.scope.ApplicationScope;
Expand Down Expand Up @@ -635,19 +636,28 @@ public Results searchCollection( String collName, Query query ) throws Exception
} }




final SearchEdge searchEdge = createCollectionSearchEdge( cpHeadEntity.getId(), collName );


final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope ); query.setEntityType( collection.getType() );
query = adjustQuery( query );


final SearchTypes types = SearchTypes.fromTypes( collection.getType() );


logger.debug( "Searching scope {}", searchEdge ); if ( query.isGraphSearch() ) {
QueryExecutor executor =
new CollectionGraphQueryExecutor( entityCollectionManagerFactory, graphManagerFactory, applicationScope,
headEntity, query.getOffsetCursor(), collName, query.getLimit() );


query.setEntityType( collection.getType() ); return executor.next();
query = adjustQuery( query ); }






final SearchEdge searchEdge = createCollectionSearchEdge( cpHeadEntity.getId(), collName );

final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope );

final SearchTypes types = SearchTypes.fromTypes( collection.getType() );

logger.debug( "Searching scope {}", searchEdge );


final CollectionResultsLoaderFactoryImpl resultsLoaderFactory = final CollectionResultsLoaderFactoryImpl resultsLoaderFactory =
new CollectionResultsLoaderFactoryImpl( managerCache ); new CollectionResultsLoaderFactoryImpl( managerCache );
Expand Down Expand Up @@ -909,6 +919,16 @@ public Results searchConnectedEntities( Query query ) throws Exception {


query = adjustQuery( query ); query = adjustQuery( query );




if ( query.isGraphSearch() ) {
QueryExecutor executor =
new ConnectionGraphQueryExecutor( entityCollectionManagerFactory, graphManagerFactory, applicationScope,
headEntity, query.getOffsetCursor(), connection, query.getLimit() );

return executor.next();
}

final ConnectionResultsLoaderFactoryImpl resultsLoaderFactory = final ConnectionResultsLoaderFactoryImpl resultsLoaderFactory =
new ConnectionResultsLoaderFactoryImpl( managerCache, headEntity, connection ); new ConnectionResultsLoaderFactoryImpl( managerCache, headEntity, connection );


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@




import java.io.Serializable; import java.io.Serializable;
import java.util.List;
import java.util.Map; import java.util.Map;


import org.apache.usergrid.corepersistence.command.read.AbstractCommand; import org.apache.usergrid.corepersistence.command.read.AbstractCommand;
import org.apache.usergrid.corepersistence.command.read.CollectCommand; import org.apache.usergrid.corepersistence.command.read.CollectCommand;
import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
import org.apache.usergrid.persistence.EntityFactory; import org.apache.usergrid.persistence.EntityFactory;
import org.apache.usergrid.persistence.Results; import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.EntitySet; import org.apache.usergrid.persistence.collection.EntitySet;
Expand All @@ -39,10 +37,6 @@
import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.Id;


import rx.Observable; import rx.Observable;
import rx.functions.Func1;
import rx.observables.GroupedObservable;

import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromEntityType;




/** /**
Expand Down Expand Up @@ -73,98 +67,47 @@ protected Class<Serializable> getCursorClass() {




@Override @Override
public Observable<Results> call( final Observable<? extends Id> observable ) { public Observable<Results> call( final Observable<Id> observable ) {




/** /**
* A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results
* objects * objects
*/ */


final EntityCollectionManager entityCollectionManager =
entityCollectionManagerFactory.createCollectionManager( applicationScope );

final Observable<EntitySet> entitySetObservable = observable.buffer( resultSize ).flatMap(
bufferedIds -> Observable.just( bufferedIds ).flatMap( ids -> entityCollectionManager.load( ids ) ) );


return entitySetObservable

.flatMap( entitySet -> {

//get our entites and filter missing ones, then collect them into a results object
final Observable<MvccEntity> mvccEntityObservable = Observable.from( entitySet.getEntities() );

//convert them to our old entity model, then filter nulls, meaning they weren't found
return mvccEntityObservable.map( mvccEntity -> mapEntity( mvccEntity ) ).filter(
entity -> entity == null )

//convert them to a list, then map them into results
.toList().map( entities -> {
final Results results = Results.fromEntities( entities );
results.setCursor( generateCursor() );


return observable.buffer( resultSize ).flatMap( new Func1<List<? extends Id>, Observable<Results>>() { return results;
@Override
public Observable<Results> call( final List<? extends Id> ids ) {

return Observable.from( ids )
//group them by type so we can load them, in 2.0 dev this step will be removed
.groupBy( new Func1<Id, String>() {
@Override
public String call( final Id id ) {
return id.getType();
}
} )

//take all our groups and load them as id sets

.flatMap( new Func1<GroupedObservable<String, Id>, Observable<EntitySet>>() {
@Override
public Observable<EntitySet> call(
final GroupedObservable<String, Id> stringIdGroupedObservable ) {


final String entityType = stringIdGroupedObservable.getKey();

final CollectionScope collectionScope =
getCollectionScopeNameFromEntityType( applicationScope.getApplication(), entityType );


return stringIdGroupedObservable.toList()
.flatMap( new Func1<List<Id>, Observable<EntitySet>>() {
@Override
public Observable<EntitySet> call(
final List<Id> ids ) {

final EntityCollectionManager ecm =
entityCollectionManagerFactory
.createCollectionManager( collectionScope );
return ecm.load( ids );
}
} );
}
} )
//emit our groups of entities as a stream of entities
.flatMap( new Func1<EntitySet, Observable<org.apache.usergrid.persistence.Entity>>() {
@Override
public Observable<org.apache.usergrid.persistence.Entity> call( final EntitySet entitySet ) {
//emit our entities, and filter out deleted entites
return Observable.from( entitySet.getEntities() ).map(
new Func1<MvccEntity, org.apache.usergrid.persistence.Entity>() {

@Override
public org.apache.usergrid.persistence.Entity call( final MvccEntity mvccEntity ) {
return mapEntity( mvccEntity );
}
} )
//filter null entities
.filter( new Func1<org.apache.usergrid.persistence.Entity, Boolean>() {
@Override
public Boolean call( final org.apache.usergrid.persistence.Entity entity ) {
return entity == null;
}
} );
}
} )

//convert them to a list, then map them into results
.toList().map( new Func1<List<org.apache.usergrid.persistence.Entity>, Results>() {
@Override
public Results call( final List<org.apache.usergrid.persistence.Entity> entities ) {
final Results results = Results.fromEntities( entities );
results.setCursor( generateCursor() );

return results;
}
} ); } );
} } );
} );
} }


/**
* Map a new cp entity to an old entity. May be null if not present
*/




/**
* Map a new cp entity to an old entity. May be null if not present
*/
private org.apache.usergrid.persistence.Entity mapEntity( final MvccEntity mvccEntity ) { private org.apache.usergrid.persistence.Entity mapEntity( final MvccEntity mvccEntity ) {
if ( !mvccEntity.getEntity().isPresent() ) { if ( !mvccEntity.getEntity().isPresent() ) {
return null; return null;
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public AbstractReadGraphCommand( final GraphManagerFactory graphManagerFactory )




@Override @Override
public Observable<Id> call( final Observable<? extends Id> observable ) { public Observable<Id> call( final Observable<Id> observable ) {


//get the graph manager //get the graph manager
final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope ); final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope );
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/ */
package org.apache.usergrid.corepersistence.migration; package org.apache.usergrid.corepersistence.migration;


import com.google.common.base.Optional;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.corepersistence.util.CpNamingUtils;
Expand Down Expand Up @@ -261,7 +262,7 @@ public Observable<org.apache.usergrid.persistence.model.entity.Entity> getOldApp
Id rootAppId = systemAppScope.getApplication(); Id rootAppId = systemAppScope.getApplication();


final SimpleSearchByEdgeType simpleSearchByEdgeType = new SimpleSearchByEdgeType( final SimpleSearchByEdgeType simpleSearchByEdgeType = new SimpleSearchByEdgeType(
rootAppId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null); rootAppId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent());


Observable<org.apache.usergrid.persistence.model.entity.Entity> entityObs = Observable<org.apache.usergrid.persistence.model.entity.Entity> entityObs =
gm.loadEdgesFromSource( simpleSearchByEdgeType ) gm.loadEdgesFromSource( simpleSearchByEdgeType )
Expand All @@ -284,7 +285,8 @@ public Observable<Edge> getApplicationInfoEdges() {




final SimpleSearchByEdgeType simpleSearchByEdgeType = new SimpleSearchByEdgeType( final SimpleSearchByEdgeType simpleSearchByEdgeType = new SimpleSearchByEdgeType(
CpNamingUtils.generateApplicationId(CpNamingUtils.MANAGEMENT_APPLICATION_ID), edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null); CpNamingUtils.generateApplicationId(CpNamingUtils.MANAGEMENT_APPLICATION_ID), edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
Optional.absent());
edgesObservable = edgesObservable !=null ? edgesObservable : gm.loadEdgesFromSource( simpleSearchByEdgeType ); edgesObservable = edgesObservable !=null ? edgesObservable : gm.loadEdgesFromSource( simpleSearchByEdgeType );
return edgesObservable; return edgesObservable;
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ public class CollectionGraphQueryExecutor extends AbstractGraphQueryExecutor {
public CollectionGraphQueryExecutor( final EntityCollectionManagerFactory entityCollectionManagerFactory, public CollectionGraphQueryExecutor( final EntityCollectionManagerFactory entityCollectionManagerFactory,
final GraphManagerFactory graphManagerFactory, final GraphManagerFactory graphManagerFactory,
final ApplicationScope applicationScope, final EntityRef source, final ApplicationScope applicationScope, final EntityRef source,
final String cursor, final String connectionName, final int limit ) { final String cursor, final String collectionName, final int limit ) {


super( entityCollectionManagerFactory, applicationScope, source, cursor, limit ); super( entityCollectionManagerFactory, applicationScope, source, cursor, limit );
this.graphManagerFactory = graphManagerFactory; this.graphManagerFactory = graphManagerFactory;


Preconditions.checkNotNull( connectionName, "connectionName is required on the query" ); Preconditions.checkNotNull( collectionName, "collectionName is required on the query" );
this.collectionName = connectionName; this.collectionName = collectionName;
} }




Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.utils.UUIDUtils; import org.apache.usergrid.utils.UUIDUtils;


import com.google.common.base.Optional;
import com.google.inject.Inject; import com.google.inject.Inject;


import rx.Observable; import rx.Observable;
Expand Down Expand Up @@ -91,8 +92,8 @@ public Observable<ApplicationScope> getData() {


//we have app infos. For each of these app infos, we have to load the application itself //we have app infos. For each of these app infos, we have to load the application itself
Observable<ApplicationScope> appIds = gm.loadEdgesFromSource( Observable<ApplicationScope> appIds = gm.loadEdgesFromSource(
new SimpleSearchByEdgeType( rootAppId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, new SimpleSearchByEdgeType( rootAppId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
null ) ).flatMap( new Func1<Edge, Observable<ApplicationScope>>() { Optional.absent() ) ).flatMap( new Func1<Edge, Observable<ApplicationScope>>() {
@Override @Override
public Observable<ApplicationScope> call( final Edge edge ) { public Observable<ApplicationScope> call( final Edge edge ) {


Expand All @@ -118,7 +119,7 @@ public Boolean call( final Entity entity ) {
@Override @Override
public ApplicationScope call( final Entity entity ) { public ApplicationScope call( final Entity entity ) {
final UUID uuid = UUIDUtils.tryExtractUUID( final UUID uuid = UUIDUtils.tryExtractUUID(
entity.getField( Schema.PROPERTY_APPLICATION_ID ).getValue().toString()); entity.getField( Schema.PROPERTY_APPLICATION_ID ).getValue().toString() );
return getApplicationScope( uuid ); return getApplicationScope( uuid );
} }
} ); } );
Expand Down
Loading

0 comments on commit 83174e8

Please sign in to comment.