Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
Change ImportServicesImpl to use searchConnectedEntities() instead of…
Browse files Browse the repository at this point in the history
… getConnectedEntities() as a work around to make ImportCollectionIT#testImportWithMultipleFiles pass.
  • Loading branch information
Dave Johnson committed Feb 10, 2015
1 parent 73be355 commit b8da17c
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 77 deletions.
Expand Up @@ -28,6 +28,7 @@
import org.apache.usergrid.persistence.entities.FileImport;
import org.apache.usergrid.persistence.entities.Import;
import org.apache.usergrid.persistence.entities.JobData;
import org.apache.usergrid.persistence.index.query.Query;
import org.apache.usergrid.persistence.index.query.Query.Level;
import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
Expand Down Expand Up @@ -63,42 +64,24 @@ public class ImportServiceImpl implements ImportService {
public static final String FILE_IMPORT_JOB_NAME = "fileImportJob";
public static final int HEARTBEAT_COUNT = 50;


private static final Logger logger = LoggerFactory.getLogger(ImportServiceImpl.class);

//injected the Entity Manager Factory
int MAX_FILE_IMPORTS = 1000; // max number of file import jobs / import job

protected EntityManagerFactory emf;

//dependency injection
private SchedulerService sch;

private ServiceManagerFactory smf;

//Dependency injection through spring
private QueueManager qm;

private QueueManagerFactory queueManagerFactory;

//inject Management Service to access Organization Data
private ManagementService managementService;

private JsonFactory jsonFactory = new JsonFactory();


@PostConstruct
public void init(){

//TODO: move this to a before or initialization method.

//TODO: made queueName clearly defined.
//smf = getApplicationContext().getBean(ServiceManagerFactory.class);

String name = ImportQueueListener.QUEUE_NAME;
QueueScopeFactory queueScopeFactory = CpSetup.getInjector().getInstance(QueueScopeFactory.class);
QueueScope queueScope = queueScopeFactory.getScope(CpNamingUtils.MANAGEMENT_APPLICATION_ID, name);
queueManagerFactory = CpSetup.getInjector().getInstance(QueueManagerFactory.class);
qm = queueManagerFactory.getQueueManager(queueScope);
}


/**
* This schedules the main import Job.
*
Expand Down Expand Up @@ -217,19 +200,25 @@ private JobData createFileTask( Map<String, Object> config, String file, EntityR
private int getConnectionCount( final Import importRoot ) {

try {
EntityManager rootEm = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );

Results entities = rootEm.getConnectedEntities( importRoot, "includes", null, Level.ALL_PROPERTIES );
PagingResultsIterator itr = new PagingResultsIterator( entities );

int count = 0;

while ( itr.hasNext() ) {
itr.next();
count++;
}

return count;
EntityManager emMgmtApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
Query query = Query.fromQL("select *");
query.setEntityType("file_import");
query.setConnectionType("includes");
query.setLimit(MAX_FILE_IMPORTS);

Results entities = emMgmtApp.searchConnectedEntities( importRoot, query );
return entities.size();

// see ImportConnectsTest()
// Results entities = emMgmtApp.getConnectedEntities( importRoot, "includes", null, Level.ALL_PROPERTIES );
// PagingResultsIterator itr = new PagingResultsIterator( entities );
// int count = 0;
// while ( itr.hasNext() ) {
// itr.next();
// count++;
// }
// return count;
}
catch ( Exception e ) {
logger.error( "application doesn't exist within the current context" );
Expand Down Expand Up @@ -485,8 +474,10 @@ public void doImport(JobExecution jobExecution) throws Exception {

final int count = getConnectionCount(importEntity);
if ( count == fileJobs.size() ) {
logger.debug("Got ALL {} of {} expected connections", count, fileJobs.size());
done = true;
} else {
logger.debug("Got {} of {} expected connections. Waiting...", count, fileJobs.size());
Thread.sleep(1000);
}
}
Expand Down Expand Up @@ -591,36 +582,12 @@ public void downloadAndImportFile(JobExecution jobExecution) throws Exception {
String randTag = RandomStringUtils.randomAlphanumeric(4);
logger.debug("{} Got importEntity {}", randTag, importEntity.getUuid() );

Results entities = emManagementApp.getConnectedEntities(
importEntity, "includes", "file_import", Level.ALL_PROPERTIES);


// int retries = 0;
// int maxRetries = 60;
// Results entities = null;
// boolean done = false;
// while ( !done && retries++ < maxRetries ) {
//
// // get all file import job siblings of the current job we're working now
// entities = emManagementApp.getConnectedEntities(
// importEntity, "includes", "file_import", Level.ALL_PROPERTIES);
//
// if ( entities.size() == importEntity.getFileCount() ) {
// logger.debug("{} got {} file_import entities, expected {} DONE!",
// new Object[] { randTag, entities.size(), importEntity.getFileCount() });
// done = true;
//
// } else {
// logger.debug("{} got {} file_import entities, expected {} waiting... ",
// new Object[] { randTag, entities.size(), importEntity.getFileCount() });
// Thread.sleep(1000);
// }
// }
//
// if ( retries >= maxRetries ) {
// throw new RuntimeException("Max retries was reached");
// }

EntityManager emMgmtApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
Query query = Query.fromQL("select *");
query.setEntityType("file_import");
query.setConnectionType("includes");
query.setLimit(MAX_FILE_IMPORTS);
Results entities = emMgmtApp.searchConnectedEntities(importEntity, query);

PagingResultsIterator itr = new PagingResultsIterator( entities );

Expand Down
Expand Up @@ -55,10 +55,41 @@ public class ImportConnectionsTest {
@Rule
public NewOrgAppAdminRule newOrgAppAdminRule = new NewOrgAppAdminRule( setup );


@Test
@Ignore("Because getConnectedEntities() is broken")
public void testCreateAndCountConnectionsViaGet() throws Exception {

doTestCreateAndCountConnections(new ConnectionCounter() {
@Override
public int count(Import importEntity) {
return getConnectionCountViaGet(importEntity);
}
});
}


@Test
public void testCreateAndSearchConnections() throws Exception {
public void testCreateAndCountConnectionsViaSearch() throws Exception {

doTestCreateAndCountConnections(new ConnectionCounter() {
@Override
public int count(Import importEntity) {
return getConnectionCountViaSearch(importEntity);
}
});
}


interface ConnectionCounter {
int count( Import importEntity );
}


final int connectionCount = 10;
public void doTestCreateAndCountConnections(
ConnectionCounter counter) throws Exception {

final int connectionCount = 15;

EntityManager emMgmtApp = setup.getEmf()
.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
Expand All @@ -75,50 +106,78 @@ public void testCreateAndSearchConnections() throws Exception {
}

int retries = 0;
int maxRetries = 60;
int maxRetries = 20;
boolean done = false;
int count = 0;
while ( !done && retries++ < maxRetries ) {

final int count = getConnectionCount(importEntity);
count = counter.count( importEntity );
if ( count == connectionCount ) {
logger.debug("Count good!");
done = true;
} else {
logger.debug("Waiting...");
logger.debug("Got {} of {} Waiting...", count, connectionCount );
Thread.sleep(1000);
}
}
if ( retries >= maxRetries ) {
throw new RuntimeException("Max retries was reached");
}

assertEquals("did not get all connections",
connectionCount, getConnectionCount( importEntity ));
assertEquals("did not get all connections", connectionCount, count);
}

private int getConnectionCount( final Import importRoot ) {

private int getConnectionCountViaGet( final Import importRoot ) {

try {
EntityManager rootEm = setup.getEmf()
EntityManager emMgmtApp = setup.getEmf()
.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID );

Results entities = rootEm.getConnectedEntities(
Results entities = emMgmtApp.getConnectedEntities(
importRoot, "includes", null, Query.Level.ALL_PROPERTIES );
PagingResultsIterator itr = new PagingResultsIterator( entities );

PagingResultsIterator itr = new PagingResultsIterator( entities );
int count = 0;

while ( itr.hasNext() ) {
itr.next();
count++;
}

return count;
}
catch ( Exception e ) {
logger.error( "application doesn't exist within the current context" );
throw new RuntimeException( e );
}
}


private int getConnectionCountViaSearch( final Import importRoot ) {

try {
EntityManager emMgmtApp = setup.getEmf()
.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID );

Query query = Query.fromQL("select *");
query.setEntityType("file_import");
query.setConnectionType("includes");
query.setLimit(10000);

Results entities = emMgmtApp.searchConnectedEntities( importRoot, query );
return entities.size();

// PagingResultsIterator itr = new PagingResultsIterator( entities );
// int count = 0;
// while ( itr.hasNext() ) {
// itr.next();
// count++;
// }
// return count;
}
catch ( Exception e ) {
logger.error( "application doesn't exist within the current context" );
throw new RuntimeException( e );
}
}
}

0 comments on commit b8da17c

Please sign in to comment.