Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
add compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
Shawn Feldman committed May 28, 2015
1 parent b85ff25 commit 8bb7796
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 13 deletions.
Expand Up @@ -28,6 +28,7 @@

import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
import org.apache.usergrid.persistence.*;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
Expand Down Expand Up @@ -348,7 +349,7 @@ public Observable migrateAppInfo(UUID applicationUUID, String collectionFromName

Entity oldAppEntity = managementEm.get(new SimpleEntityRef(collectionFromName, applicationUUID));
Observable copyConnections = Observable.empty();
if(oldAppEntity!=null) {
if (oldAppEntity != null) {
// ensure that there is not already a deleted app with the same name

final EntityRef alias = managementEm.getAlias(collectionToName, oldAppEntity.getName());
Expand Down Expand Up @@ -379,25 +380,32 @@ public Observable migrateAppInfo(UUID applicationUUID, String collectionFromName
throw new RuntimeException(e);
}
});

}
final Id managementAppId = CpNamingUtils.getManagementApplicationId();
final ApplicationEntityIndex aei = entityIndexFactory.createApplicationEntityIndex(applicationScope);
final GraphManager managementGraphManager = managerCache.getGraphManager(managementAppScope);
Edge deleteEdge = CpNamingUtils.createCollectionEdge( CpNamingUtils.getManagementApplicationId(),collectionFromName,applicationId);
Edge createEdge = CpNamingUtils.createCollectionEdge( CpNamingUtils.getManagementApplicationId(),collectionToName,applicationId);
final Edge createEdge = CpNamingUtils.createCollectionEdge(managementAppId, collectionToName, applicationId);

final Observable compactObservable = managementGraphManager.compactNode(applicationId);

final Observable deleteNodeGraph = managementGraphManager
.markNode(applicationId, CpNamingUtils.createGraphOperationTimestamp())
.flatMap(id -> compactObservable);

final Observable deleteNodeGraph = managementGraphManager.deleteEdge(deleteEdge);
final Observable createNodeGraph = managementGraphManager.writeEdge(createEdge);

final Observable deleteAppFromIndex = aei.deleteApplication();

return Observable.concat(copyConnections, createNodeGraph, deleteNodeGraph, deleteAppFromIndex)
return Observable
.merge(copyConnections, createNodeGraph, deleteNodeGraph, deleteAppFromIndex)
.doOnCompleted(() -> {
try {
if (oldAppEntity != null) {
managementEm.delete(oldAppEntity);
applicationIdCache.evictAppId(oldAppEntity.getName());
entityIndex.refreshAsync().toBlocking().first();
}
entityIndex.refreshAsync().toBlocking().last();
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -439,13 +447,13 @@ public Map<String, UUID> getApplications(boolean deleted) throws Exception {
CpNamingUtils.getApplicationScope(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
GraphManager gm = managerCache.getGraphManager(appScope);

EntityManager em = getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
Application app = em.getApplication();
if( app == null ) {
EntityManager managementEM = getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
Application managementApp = managementEM.getApplication();
if( managementApp == null ) {
throw new RuntimeException("Management App "
+ CpNamingUtils.MANAGEMENT_APPLICATION_ID + " should never be null");
}
Id fromEntityId = new SimpleId( app.getUuid(), app.getType() );
Id managementId = new SimpleId( managementApp.getUuid(), managementApp.getType() );

final String edgeType;

Expand All @@ -456,10 +464,10 @@ public Map<String, UUID> getApplications(boolean deleted) throws Exception {
}

logger.debug("getApplications(): Loading edges of edgeType {} from {}:{}",
new Object[] { edgeType, fromEntityId.getType(), fromEntityId.getUuid() } );
new Object[]{edgeType, managementId.getType(), managementId.getUuid()});

Observable<Edge> edges = gm.loadEdgesFromSource( new SimpleSearchByEdgeType(
fromEntityId, edgeType, Long.MAX_VALUE,
managementId, edgeType, Long.MAX_VALUE,
SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ));

// TODO This is wrong, and will result in OOM if there are too many applications.
Expand Down
Expand Up @@ -167,10 +167,18 @@ public static Edge createConnectionEdge( final Id sourceEntityId, final String c
final String edgeType = getEdgeTypeFromConnectionType( connectionType );

// create graph edge connection from head entity to member entity
return new SimpleEdge( sourceEntityId, edgeType, targetEntityId, System.currentTimeMillis() );
return new SimpleEdge( sourceEntityId, edgeType, targetEntityId, UUIDUtils.newTimeUUID().timestamp() );
}


/**
* When marking nodes for deletion we must use the same unit of measure as the edge timestamps
* @return
*/
public static long createGraphOperationTimestamp(){
return UUIDUtils.newTimeUUID().timestamp();
}

/**
* Create a connection searchEdge
*
Expand Down

0 comments on commit 8bb7796

Please sign in to comment.