Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
Refactor into commands WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Todd Nine committed Apr 24, 2015
1 parent 60617d2 commit d5978cc
Show file tree
Hide file tree
Showing 24 changed files with 444 additions and 50 deletions.
Expand Up @@ -32,10 +32,10 @@


import org.apache.usergrid.corepersistence.index.AsyncIndexService; import org.apache.usergrid.corepersistence.index.AsyncIndexService;
import org.apache.usergrid.corepersistence.results.CollectionGraphQueryExecutor; import org.apache.usergrid.corepersistence.results.CollectionGraphQueryExecutor;
import org.apache.usergrid.corepersistence.results.CollectionResultsLoaderFactoryImpl; import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.CollectionResultsLoaderFactoryImpl;
import org.apache.usergrid.corepersistence.results.ConnectionGraphQueryExecutor; import org.apache.usergrid.corepersistence.results.ConnectionGraphQueryExecutor;
import org.apache.usergrid.corepersistence.results.ConnectionResultsLoaderFactoryImpl; import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ConnectionResultsLoaderFactoryImpl;
import org.apache.usergrid.corepersistence.results.ElasticSearchQueryExecutor; import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ElasticSearchQueryExecutor;
import org.apache.usergrid.corepersistence.results.QueryExecutor; import org.apache.usergrid.corepersistence.results.QueryExecutor;
import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.corepersistence.util.CpNamingUtils;
Expand Down
Expand Up @@ -22,7 +22,7 @@


import org.apache.usergrid.corepersistence.command.cursor.RequestCursor; import org.apache.usergrid.corepersistence.command.cursor.RequestCursor;
import org.apache.usergrid.corepersistence.command.cursor.ResponseCursor; import org.apache.usergrid.corepersistence.command.cursor.ResponseCursor;
import org.apache.usergrid.corepersistence.command.read.CollectCommand; import org.apache.usergrid.corepersistence.command.read.Collector;
import org.apache.usergrid.corepersistence.command.read.Command; import org.apache.usergrid.corepersistence.command.read.Command;
import org.apache.usergrid.corepersistence.command.read.TraverseCommand; import org.apache.usergrid.corepersistence.command.read.TraverseCommand;
import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.scope.ApplicationScope;
Expand Down Expand Up @@ -87,12 +87,12 @@ public CommandBuilder withTraverseCommand( final TraverseCommand traverseCommand
/** /**
* Build the final collection step, and * Build the final collection step, and
*/ */
public <T> Observable<T> build( final CollectCommand<T> collectCommand ) { public <T> Observable<T> build( final Collector<T> collector ) {
setState( collectCommand ); setState( collector );


collectCommand.setLimit( limit ); collector.setLimit( limit );


return currentObservable.compose( collectCommand ); return currentObservable.compose( collector );
} }




Expand Down
Expand Up @@ -24,13 +24,13 @@
* A command that is used to reduce our stream of results into a final output * A command that is used to reduce our stream of results into a final output
* @param <T> * @param <T>
*/ */
public interface CollectCommand<T> extends Command<T>{ public interface Collector<T> extends Command<T>{


/** /**
* Set the prefered result size for the command * Set the prefered result size for the command
* @param resultSize * @param limit
*/ */
void setLimit( final int resultSize ); void setLimit( final int limit );




} }
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.usergrid.corepersistence.command.read;


import org.apache.usergrid.corepersistence.command.read.elasticsearch.QueryCollectionElasticSearchCollector;
import org.apache.usergrid.corepersistence.command.read.elasticsearch.QueryConnectionElasticSearchCollector;
import org.apache.usergrid.corepersistence.command.read.entity.EntityLoadCollector;
import org.apache.usergrid.corepersistence.command.read.graph.ReadGraphCollectionCommand;
import org.apache.usergrid.corepersistence.command.read.graph.ReadGraphConnectionCommand;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Id;


/**
* A factory for generating read commands
*/
public interface ReadCommandFactory {


/**
* Generate a new instance of the command with the specified parameters
* @param applicationScope
* @param sourceId
* @param collectionName
* @return
*/
ReadGraphCollectionCommand readGraphCollectionCommand(final ApplicationScope applicationScope, final Id sourceId, final String collectionName);

/**
* Generate a new instance of the command with the specified parameters
* @param applicationScope
* @param sourceId
* @param connectionName
* @return
*/
ReadGraphConnectionCommand readGraphConnectionCommand(final ApplicationScope applicationScope, final Id sourceId, final String connectionName);

/**
* Generate a new instance of the command with the specified parameters
* @param applicationScope
* @return
*/
EntityLoadCollector entityLoadCollector(final ApplicationScope applicationScope);

/**
* Generate a new instance of the command with the specified parameters
* @param applicationScope
* @param sourceId
* @param collectionName
* @return
*/
QueryCollectionElasticSearchCollector queryCollectionElasticSearchCollector(final ApplicationScope applicationScope, final Id sourceId, final String collectionName);


/**
* Generate a new instance of the command with the specified parameters
* @param applicationScope
* @param sourceId
* @param connectionName
* @return
*/
QueryConnectionElasticSearchCollector queryConnectionElasticSearchCollector(final ApplicationScope applicationScope, final Id sourceId, final String connectionName);
}
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.usergrid.corepersistence.command.read.elasticsearch;


import java.util.Iterator;

import org.apache.usergrid.corepersistence.command.read.AbstractCommand;
import org.apache.usergrid.corepersistence.command.read.Collector;
import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ElasticSearchQueryExecutor;
import org.apache.usergrid.corepersistence.results.QueryExecutor;
import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ResultsLoaderFactory;
import org.apache.usergrid.persistence.Query;
import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
import org.apache.usergrid.persistence.index.SearchEdge;
import org.apache.usergrid.persistence.index.SearchTypes;
import org.apache.usergrid.persistence.model.entity.Id;

import com.google.inject.Inject;

import rx.Observable;


/**
* A command that will query and load elasticsearch
*
* On future iteration, this needs to be split into 2 commands 1 that loads the candidate results and validates the
* versions then another that will search and load
*
* TODO, split this into 3 seperate observables
*
* 1) An observable that emits candidate results
* 2) An observable that validates versions by uuid (for Traverse commands)
* 3) An observbale that emits Results as a collector (for final commands)
*/
public abstract class AbstractQueryElasticSearchCollector extends AbstractCommand<Results, Integer> implements
Collector<Results> {


private final ResultsLoaderFactory resultsLoaderFactory;
private final ApplicationEntityIndex entityIndex;
private final ApplicationScope applicationScope;
private final SearchEdge indexScope;
private final SearchTypes searchTypes;
private final Query query;
private int limit;



protected AbstractQueryElasticSearchCollector( final ApplicationEntityIndex entityIndex,
final ApplicationScope applicationScope, final SearchEdge indexScope,
final SearchTypes searchTypes, final Query query ) {
this.entityIndex = entityIndex;
this.applicationScope = applicationScope;
this.indexScope = indexScope;
this.searchTypes = searchTypes;
this.query = query;
this.resultsLoaderFactory = getResultsLoaderFactory();
}


@Override
public Observable<Results> call( final Observable<Id> idObservable ) {
final Iterable<Results> executor =
new ElasticSearchQueryExecutor( resultsLoaderFactory, entityIndex, applicationScope, indexScope, searchTypes,
query.withLimit( limit ));

return Observable.from(executor);
}


/**
* Get the results loader factor
* @return
*/
protected abstract ResultsLoaderFactory getResultsLoaderFactory();

@Override
protected Class<Integer> getCursorClass() {
return Integer.class;
}


@Override
public void setLimit( final int limit ) {
this.limit = limit;
}
}
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.usergrid.corepersistence.command.read.elasticsearch;


import org.apache.usergrid.corepersistence.ManagerCache;
import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ConnectionResultsLoaderFactoryImpl;
import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ResultsLoaderFactory;
import org.apache.usergrid.persistence.EntityRef;
import org.apache.usergrid.persistence.Query;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
import org.apache.usergrid.persistence.index.SearchEdge;
import org.apache.usergrid.persistence.index.SearchTypes;

import com.google.inject.Inject;


/**
* Command for querying connections
*/
public class QueryCollectionElasticSearchCollector extends AbstractQueryElasticSearchCollector {

private final ManagerCache managerCache;
private final EntityRef headEntity;
private final String connectionName;

@Inject
protected QueryCollectionElasticSearchCollector( final ApplicationEntityIndex entityIndex,
final ApplicationScope applicationScope,
final SearchEdge indexScope, final SearchTypes searchTypes,
final Query query, final ManagerCache managerCache,
final EntityRef headEntity, final String connectionName ) {
super( entityIndex, applicationScope, indexScope, searchTypes, query );
this.managerCache = managerCache;
this.headEntity = headEntity;
this.connectionName = connectionName;
}


@Override
protected ResultsLoaderFactory getResultsLoaderFactory() {
return new ConnectionResultsLoaderFactoryImpl( managerCache, headEntity, connectionName );
}
}
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.usergrid.corepersistence.command.read.elasticsearch;


import org.apache.usergrid.corepersistence.ManagerCache;
import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ConnectionResultsLoaderFactoryImpl;
import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ResultsLoaderFactory;
import org.apache.usergrid.persistence.EntityRef;
import org.apache.usergrid.persistence.Query;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
import org.apache.usergrid.persistence.index.SearchEdge;
import org.apache.usergrid.persistence.index.SearchTypes;

import com.google.inject.Inject;


/**
* Command for querying connections
*/
public class QueryConnectionElasticSearchCollector extends AbstractQueryElasticSearchCollector {

private final ManagerCache managerCache;
private final EntityRef headEntity;
private final String connectionName;

@Inject
protected QueryConnectionElasticSearchCollector( final ApplicationEntityIndex entityIndex,
final ApplicationScope applicationScope,
final SearchEdge indexScope, final SearchTypes searchTypes,
final Query query, final ManagerCache managerCache,
final EntityRef headEntity, final String connectionName ) {
super( entityIndex, applicationScope, indexScope, searchTypes, query );
this.managerCache = managerCache;
this.headEntity = headEntity;
this.connectionName = connectionName;
}


@Override
protected ResultsLoaderFactory getResultsLoaderFactory() {
return new ConnectionResultsLoaderFactoryImpl( managerCache, headEntity, connectionName );
}
}
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations * specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package org.apache.usergrid.corepersistence.results; package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl;




import java.util.ArrayList; import java.util.ArrayList;
Expand Down
Expand Up @@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */


package org.apache.usergrid.corepersistence.results; package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl;




import org.apache.usergrid.corepersistence.ManagerCache; import org.apache.usergrid.corepersistence.ManagerCache;
Expand Down

0 comments on commit d5978cc

Please sign in to comment.