Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
adding generic to prevent casting at low levels
Browse files Browse the repository at this point in the history
  • Loading branch information
Shawn Feldman committed Feb 13, 2015
1 parent 35b95fd commit f1b0c8d
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 47 deletions.
Expand Up @@ -601,7 +601,7 @@ public void setApplicationContext( ApplicationContext applicationContext ) throw
public long performEntityCount() {
//TODO, this really needs to be a task that writes this data somewhere since this will get
//progressively slower as the system expands
return getAllEntitiesObservable().getAllEntitiesInSystem(1000).longCount().toBlocking().last();
return (Long) getAllEntitiesObservable().getAllEntitiesInSystem(1000).longCount().toBlocking().last();
}


Expand Down
Expand Up @@ -46,7 +46,7 @@
* Note that this only walks each application applicationId graph, and emits edges from the applicationId and it's edges as the s
* source node
*/
public class AllEntitiesInSystemObservableImpl implements AllEntitiesInSystemObservable {
public class AllEntitiesInSystemObservableImpl implements AllEntitiesInSystemObservable<CollectionScope> {

private final ApplicationObservable applicationObservable;
private final GraphManagerFactory graphManagerFactory;
Expand All @@ -61,13 +61,13 @@ public AllEntitiesInSystemObservableImpl(ApplicationObservable applicationObserv
}


public Observable<ApplicationEntityGroup> getAllEntitiesInSystem(final int bufferSize) {
public Observable<ApplicationEntityGroup<CollectionScope>> getAllEntitiesInSystem(final int bufferSize) {
//traverse all nodes in the graph, load all source edges from them, then re-save the meta data
return applicationObservable.getAllApplicationIds( )

.flatMap( new Func1<Id, Observable<ApplicationEntityGroup>>() {
.flatMap( new Func1<Id, Observable<ApplicationEntityGroup<CollectionScope>>>() {
@Override
public Observable<ApplicationEntityGroup> call( final Id applicationId ) {
public Observable<ApplicationEntityGroup<CollectionScope>> call( final Id applicationId ) {

//set up our application scope and graph manager
final ApplicationScope applicationScope = new ApplicationScopeImpl(
Expand All @@ -94,22 +94,22 @@ public Observable<ApplicationEntityGroup> call( final Id applicationId ) {
return Observable
.merge(applicationNode, entityNodes)
.buffer(bufferSize)
.map(new Func1<List<Id>, List<EntityIdScope>>() {
.map(new Func1<List<Id>, List<EntityIdScope<CollectionScope>>>() {
@Override
public List<EntityIdScope> call(List<Id> ids) {
List<EntityIdScope> scopes = new ArrayList<>(ids.size());
public List<EntityIdScope<CollectionScope>> call(List<Id> ids) {
List<EntityIdScope<CollectionScope>> scopes = new ArrayList<>(ids.size());
for (Id id : ids) {
CollectionScope scope = CpNamingUtils.getCollectionScopeNameFromEntityType(applicationId, id.getType());
EntityIdScope idScope = new EntityIdScope(id, scope);
EntityIdScope<CollectionScope> idScope = new EntityIdScope<>(id, scope);
scopes.add(idScope);
}
return scopes;
}
})
.map(new Func1<List<EntityIdScope>, ApplicationEntityGroup>() {
.map(new Func1<List<EntityIdScope<CollectionScope>>, ApplicationEntityGroup<CollectionScope>>() {
@Override
public ApplicationEntityGroup call(final List<EntityIdScope> scopes) {
return new ApplicationEntityGroup(applicationScope, scopes);
public ApplicationEntityGroup<CollectionScope> call(final List<EntityIdScope<CollectionScope>> scopes) {
return new ApplicationEntityGroup<>(applicationScope, scopes);
}
});
}
Expand Down
Expand Up @@ -142,10 +142,10 @@ public void testDataMigration() throws Throwable {
//using a test system, and it's not a huge amount of data, otherwise we'll overflow.

allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
.doOnNext( new Action1<ApplicationEntityGroup>() {
.doOnNext( new Action1<ApplicationEntityGroup<CollectionScope>>() {
@Override
public void call(
final ApplicationEntityGroup entity ) {
final ApplicationEntityGroup<CollectionScope> entity ) {

//add all versions from history to our comparison
for ( final EntityIdScope id : entity.entityIds ) {
Expand Down Expand Up @@ -205,12 +205,12 @@ public void call(ApplicationEntityGroup applicationEntityGroup) {

//now visit all entities in the system again, load them from v2, and ensure they're the same
allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
.doOnNext( new Action1<ApplicationEntityGroup>() {
.doOnNext( new Action1<ApplicationEntityGroup<CollectionScope>>() {
@Override
public void call(
final ApplicationEntityGroup entity ) {
final ApplicationEntityGroup<CollectionScope> entity ) {
//add all versions from history to our comparison
for ( final EntityIdScope id : entity.entityIds ) {
for ( final EntityIdScope<CollectionScope> id : entity.entityIds ) {

CollectionScope scope = CpNamingUtils
.getCollectionScopeNameFromEntityType(
Expand Down Expand Up @@ -241,16 +241,16 @@ public void call(
//now visit all entities in the system again, and load them from the EM,
// ensure we see everything we did in the v1 traversal
allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
.doOnNext( new Action1<ApplicationEntityGroup>() {
.doOnNext( new Action1<ApplicationEntityGroup<CollectionScope>>() {
@Override
public void call(
final ApplicationEntityGroup entity ) {
final ApplicationEntityGroup<CollectionScope> entity ) {

final EntityManager em = emf.getEntityManager(
entity.applicationScope.getApplication().getUuid() );

//add all versions from history to our comparison
for ( final EntityIdScope id : entity.entityIds ) {
for ( final EntityIdScope<CollectionScope> id : entity.entityIds ) {


try {
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.HashSet;
import java.util.Set;

import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
import org.apache.usergrid.persistence.core.scope.EntityIdScope;
Expand Down Expand Up @@ -142,16 +143,16 @@ public void call(final ApplicationEntityGroup entity) {
});

allEntitiesInSystemObservable.getAllEntitiesInSystem(1000)
.doOnNext(new Action1<ApplicationEntityGroup>() {
.doOnNext(new Action1<ApplicationEntityGroup<CollectionScope>>() {
@Override
public void call(
final ApplicationEntityGroup entity) {
final ApplicationEntityGroup<CollectionScope> entity) {
//ensure that each one has a type

final EntityManager em = emf.getEntityManager(
entity.applicationScope.getApplication().getUuid());

for (final EntityIdScope idScope : entity.entityIds) {
for (final EntityIdScope<CollectionScope> idScope : entity.entityIds) {
final Id id = idScope.getId();
try {
final Entity returned = em.get(id.getUuid());
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;

import org.apache.usergrid.corepersistence.CpSetup;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
Expand Down Expand Up @@ -127,15 +128,15 @@ public void testIdMapping() throws Throwable {
//read everything in previous version format and put it into our types.

allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
.doOnNext( new Action1<ApplicationEntityGroup>() {
.doOnNext( new Action1<ApplicationEntityGroup<CollectionScope>>() {
@Override
public void call(
final ApplicationEntityGroup entity ) {
final ApplicationEntityGroup<CollectionScope> entity ) {

final GraphManager gm =
managerCache.getGraphManager( entity.applicationScope );

for ( final EntityIdScope idScope : entity.entityIds ) {
for ( final EntityIdScope<CollectionScope> idScope : entity.entityIds ) {
/**
* Get our edge types from the source
*/
Expand Down Expand Up @@ -195,15 +196,15 @@ public void call(

//now visit all nodes in the system and remove their types from the multi maps, it should be empty at the end
allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
.doOnNext( new Action1<ApplicationEntityGroup>() {
.doOnNext( new Action1<ApplicationEntityGroup<CollectionScope>>() {
@Override
public void call(
final ApplicationEntityGroup entity ) {
final ApplicationEntityGroup<CollectionScope> entity ) {

final GraphManager gm =
managerCache.getGraphManager( entity.applicationScope );

for ( final EntityIdScope idScope : entity.entityIds ) {
for ( final EntityIdScope<CollectionScope> idScope : entity.entityIds ) {
/**
* Get our edge types from the source
*/
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;

import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
Expand Down Expand Up @@ -103,9 +104,9 @@ public void testEntities() throws Exception {

final GraphManager gm = managerCache.getGraphManager( scope );

allEntitiesInSystemObservableImpl.getAllEntitiesInSystem( 1000).doOnNext( new Action1<ApplicationEntityGroup>() {
allEntitiesInSystemObservableImpl.getAllEntitiesInSystem( 1000).doOnNext( new Action1<ApplicationEntityGroup<CollectionScope>>() {
@Override
public void call( final ApplicationEntityGroup entity ) {
public void call( final ApplicationEntityGroup<CollectionScope> entity ) {

assertNotNull(entity);
assertNotNull(entity.applicationScope);
Expand All @@ -116,7 +117,7 @@ public void call( final ApplicationEntityGroup entity ) {
return;
}

for(EntityIdScope idScope: entity.entityIds) {
for(EntityIdScope<CollectionScope> idScope : entity.entityIds) {

//we should only emit each node once
if ( idScope.getId().getType().equals( type1 ) ) {
Expand Down
Expand Up @@ -65,23 +65,23 @@ public Observable migrate(final ApplicationEntityGroup applicationEntityGroup, f
final AtomicLong atomicLong = new AtomicLong();
final MutationBatch totalBatch = keyspace.prepareMutationBatch();

final List<EntityIdScope> entityIds = applicationEntityGroup.entityIds;
final List<EntityIdScope<CollectionScope>> entityIds = applicationEntityGroup.entityIds;

final UUID now = UUIDGenerator.newTimeUUID();

//go through each entity in the system, and load it's entire
// history
return Observable.from(entityIds)
.subscribeOn(Schedulers.io())
.map(new Func1<EntityIdScope, Id>() {
.map(new Func1<EntityIdScope<CollectionScope>, Id>() {
@Override
public Id call(EntityIdScope idScope) {
public Id call(EntityIdScope<CollectionScope> idScope) {

ApplicationScope applicationScope = applicationEntityGroup.applicationScope;
MigrationStrategy.MigrationRelationship<MvccEntitySerializationStrategy> migration = entityMigrationStrategy.getMigration();

if (applicationScope instanceof CollectionScope) {
CollectionScope currentScope = (CollectionScope) applicationScope;
if (idScope.getCollectionScope() instanceof CollectionScope) {
CollectionScope currentScope = idScope.getCollectionScope();
//for each element in the history in the previous version,
// copy it to the CF in v2
Iterator<MvccEntity> allVersions = migration.from()
Expand All @@ -102,7 +102,7 @@ public Id call(EntityIdScope idScope) {
}
executeBatch(totalBatch, observer, atomicLong);
}

return idScope.getId();
}
})
Expand Down
Expand Up @@ -30,14 +30,14 @@
* Note that this only walks each application applicationId graph, and emits edges from the applicationId and it's edges as the s
* source node
*/
public interface AllEntitiesInSystemObservable {
public interface AllEntitiesInSystemObservable<T extends ApplicationScope> {
/**
* Return an observable that emits all entities in the system.
*
* @param bufferSize The amount of entityIds to buffer into each ApplicationEntityGroup. Note that if we exceed the buffer size
* you may be more than 1 ApplicationEntityGroup with the same application and different ids
*/
public Observable<ApplicationEntityGroup> getAllEntitiesInSystem(final int bufferSize);
public Observable<ApplicationEntityGroup<T>> getAllEntitiesInSystem(final int bufferSize);


}
Expand Down
Expand Up @@ -27,11 +27,11 @@
/**
* Get the entity data. Immutable bean for fast access
*/
public final class ApplicationEntityGroup {
public final class ApplicationEntityGroup<T extends ApplicationScope> {
public final ApplicationScope applicationScope;
public final List<EntityIdScope> entityIds;
public final List<EntityIdScope<T>> entityIds;

public ApplicationEntityGroup(final ApplicationScope applicationScope, final List<EntityIdScope> entityIds) {
public ApplicationEntityGroup(final ApplicationScope applicationScope, final List<EntityIdScope<T>> entityIds) {
this.applicationScope = applicationScope;
this.entityIds = entityIds;
}
Expand Down
Expand Up @@ -24,11 +24,11 @@
/**
* Tuple containing collectionscope and entityid
*/
public class EntityIdScope{
public class EntityIdScope<T extends ApplicationScope>{
private final Id id;
private final ApplicationScope collectionScope;
private final T collectionScope;

public EntityIdScope(Id id, ApplicationScope collectionScope){
public EntityIdScope(Id id, T collectionScope){
this.id = id;
this.collectionScope = collectionScope;
}
Expand All @@ -38,7 +38,7 @@ public Id getId() {
return id;
}

public ApplicationScope getCollectionScope() {
public T getCollectionScope() {
return collectionScope;
}
}

0 comments on commit f1b0c8d

Please sign in to comment.