diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index 4ac42ccf0c..713a2da9ac 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -28,6 +28,7 @@ import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder; import org.apache.usergrid.persistence.*; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; @@ -348,7 +349,7 @@ public Observable migrateAppInfo(UUID applicationUUID, String collectionFromName Entity oldAppEntity = managementEm.get(new SimpleEntityRef(collectionFromName, applicationUUID)); Observable copyConnections = Observable.empty(); - if(oldAppEntity!=null) { + if (oldAppEntity != null) { // ensure that there is not already a deleted app with the same name final EntityRef alias = managementEm.getAlias(collectionToName, oldAppEntity.getName()); @@ -379,25 +380,32 @@ public Observable migrateAppInfo(UUID applicationUUID, String collectionFromName throw new RuntimeException(e); } }); + } + final Id managementAppId = CpNamingUtils.getManagementApplicationId(); final ApplicationEntityIndex aei = entityIndexFactory.createApplicationEntityIndex(applicationScope); final GraphManager managementGraphManager = managerCache.getGraphManager(managementAppScope); - Edge deleteEdge = CpNamingUtils.createCollectionEdge( CpNamingUtils.getManagementApplicationId(),collectionFromName,applicationId); - Edge createEdge = CpNamingUtils.createCollectionEdge( CpNamingUtils.getManagementApplicationId(),collectionToName,applicationId); + final Edge createEdge = CpNamingUtils.createCollectionEdge(managementAppId, collectionToName, applicationId); + + final Observable compactObservable = managementGraphManager.compactNode(applicationId); + + final Observable deleteNodeGraph = managementGraphManager + .markNode(applicationId, CpNamingUtils.createGraphOperationTimestamp()) + .flatMap(id -> compactObservable); - final Observable deleteNodeGraph = managementGraphManager.deleteEdge(deleteEdge); final Observable createNodeGraph = managementGraphManager.writeEdge(createEdge); final Observable deleteAppFromIndex = aei.deleteApplication(); - return Observable.concat(copyConnections, createNodeGraph, deleteNodeGraph, deleteAppFromIndex) + return Observable + .merge(copyConnections, createNodeGraph, deleteNodeGraph, deleteAppFromIndex) .doOnCompleted(() -> { try { if (oldAppEntity != null) { managementEm.delete(oldAppEntity); applicationIdCache.evictAppId(oldAppEntity.getName()); - entityIndex.refreshAsync().toBlocking().first(); } + entityIndex.refreshAsync().toBlocking().last(); } catch (Exception e) { throw new RuntimeException(e); } @@ -439,13 +447,13 @@ public Map getApplications(boolean deleted) throws Exception { CpNamingUtils.getApplicationScope(CpNamingUtils.MANAGEMENT_APPLICATION_ID); GraphManager gm = managerCache.getGraphManager(appScope); - EntityManager em = getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID); - Application app = em.getApplication(); - if( app == null ) { + EntityManager managementEM = getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID); + Application managementApp = managementEM.getApplication(); + if( managementApp == null ) { throw new RuntimeException("Management App " + CpNamingUtils.MANAGEMENT_APPLICATION_ID + " should never be null"); } - Id fromEntityId = new SimpleId( app.getUuid(), app.getType() ); + Id managementId = new SimpleId( managementApp.getUuid(), managementApp.getType() ); final String edgeType; @@ -456,10 +464,10 @@ public Map getApplications(boolean deleted) throws Exception { } logger.debug("getApplications(): Loading edges of edgeType {} from {}:{}", - new Object[] { edgeType, fromEntityId.getType(), fromEntityId.getUuid() } ); + new Object[]{edgeType, managementId.getType(), managementId.getUuid()}); Observable edges = gm.loadEdgesFromSource( new SimpleSearchByEdgeType( - fromEntityId, edgeType, Long.MAX_VALUE, + managementId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() )); // TODO This is wrong, and will result in OOM if there are too many applications. diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java index 31dabeadef..77dd9f1d50 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java @@ -167,10 +167,18 @@ public static Edge createConnectionEdge( final Id sourceEntityId, final String c final String edgeType = getEdgeTypeFromConnectionType( connectionType ); // create graph edge connection from head entity to member entity - return new SimpleEdge( sourceEntityId, edgeType, targetEntityId, System.currentTimeMillis() ); + return new SimpleEdge( sourceEntityId, edgeType, targetEntityId, UUIDUtils.newTimeUUID().timestamp() ); } + /** + * When marking nodes for deletion we must use the same unit of measure as the edge timestamps + * @return + */ + public static long createGraphOperationTimestamp(){ + return UUIDUtils.newTimeUUID().timestamp(); + } + /** * Create a connection searchEdge *