From d5978cca07366c43314687081973f48a80b61d8c Mon Sep 17 00:00:00 2001 From: Todd Nine Date: Fri, 24 Apr 2015 14:45:50 -0600 Subject: [PATCH] Refactor into commands WIP --- .../corepersistence/CpRelationManager.java | 6 +- .../command/CommandBuilder.java | 10 +- .../{CollectCommand.java => Collector.java} | 6 +- .../command/read/ReadCommandFactory.java | 81 +++++++++++++ .../AbstractQueryElasticSearchCollector.java | 107 ++++++++++++++++++ ...QueryCollectionElasticSearchCollector.java | 62 ++++++++++ ...QueryConnectionElasticSearchCollector.java | 62 ++++++++++ .../impl}/CollectionRefsVerifier.java | 2 +- .../CollectionResultsLoaderFactoryImpl.java | 2 +- .../impl}/ConnectionRefsVerifier.java | 4 +- .../ConnectionResultsLoaderFactoryImpl.java | 2 +- .../impl}/ElasticSearchQueryExecutor.java | 8 +- .../elasticsearch/impl}/EntityVerifier.java | 2 +- .../elasticsearch/impl}/FilteringLoader.java | 2 +- .../read/elasticsearch/impl}/IdsVerifier.java | 2 +- .../elasticsearch/impl}/ResultsLoader.java | 2 +- .../impl}/ResultsLoaderFactory.java | 2 +- .../elasticsearch/impl}/ResultsVerifier.java | 2 +- .../elasticsearch/impl}/VersionVerifier.java | 2 +- ...dCommand.java => EntityLoadCollector.java} | 42 +++---- .../graph/ReadGraphCollectionCommand.java | 3 + .../graph/ReadGraphConnectionCommand.java | 3 + .../results/AbstractGraphQueryExecutor.java | 4 +- .../results/ObservableQueryExecutor.java | 76 +++++++++++++ 24 files changed, 444 insertions(+), 50 deletions(-) rename stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/{CollectCommand.java => Collector.java} (89%) create mode 100644 stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/ReadCommandFactory.java create mode 100644 stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/AbstractQueryElasticSearchCollector.java create mode 100644 stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryCollectionElasticSearchCollector.java create mode 100644 stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryConnectionElasticSearchCollector.java rename stack/core/src/main/java/org/apache/usergrid/corepersistence/{results => command/read/elasticsearch/impl}/CollectionRefsVerifier.java (95%) rename stack/core/src/main/java/org/apache/usergrid/corepersistence/{results => command/read/elasticsearch/impl}/CollectionResultsLoaderFactoryImpl.java (96%) rename stack/core/src/main/java/org/apache/usergrid/corepersistence/{results => command/read/elasticsearch/impl}/ConnectionRefsVerifier.java (91%) rename stack/core/src/main/java/org/apache/usergrid/corepersistence/{results => command/read/elasticsearch/impl}/ConnectionResultsLoaderFactoryImpl.java (96%) rename stack/core/src/main/java/org/apache/usergrid/corepersistence/{results => command/read/elasticsearch/impl}/ElasticSearchQueryExecutor.java (96%) rename stack/core/src/main/java/org/apache/usergrid/corepersistence/{results => command/read/elasticsearch/impl}/EntityVerifier.java (98%) rename stack/core/src/main/java/org/apache/usergrid/corepersistence/{results => command/read/elasticsearch/impl}/FilteringLoader.java (99%) rename stack/core/src/main/java/org/apache/usergrid/corepersistence/{results => command/read/elasticsearch/impl}/IdsVerifier.java (94%) rename stack/core/src/main/java/org/apache/usergrid/corepersistence/{results => command/read/elasticsearch/impl}/ResultsLoader.java (94%) rename stack/core/src/main/java/org/apache/usergrid/corepersistence/{results => command/read/elasticsearch/impl}/ResultsLoaderFactory.java (94%) rename stack/core/src/main/java/org/apache/usergrid/corepersistence/{results => command/read/elasticsearch/impl}/ResultsVerifier.java (95%) rename stack/core/src/main/java/org/apache/usergrid/corepersistence/{results => command/read/elasticsearch/impl}/VersionVerifier.java (97%) rename stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/{EntityLoadCommand.java => EntityLoadCollector.java} (72%) create mode 100644 stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 86ba300585..de7fef17f0 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -32,10 +32,10 @@ import org.apache.usergrid.corepersistence.index.AsyncIndexService; import org.apache.usergrid.corepersistence.results.CollectionGraphQueryExecutor; -import org.apache.usergrid.corepersistence.results.CollectionResultsLoaderFactoryImpl; +import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.CollectionResultsLoaderFactoryImpl; import org.apache.usergrid.corepersistence.results.ConnectionGraphQueryExecutor; -import org.apache.usergrid.corepersistence.results.ConnectionResultsLoaderFactoryImpl; -import org.apache.usergrid.corepersistence.results.ElasticSearchQueryExecutor; +import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ConnectionResultsLoaderFactoryImpl; +import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ElasticSearchQueryExecutor; import org.apache.usergrid.corepersistence.results.QueryExecutor; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpNamingUtils; diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java index de76f5081c..d9d6a12481 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java @@ -22,7 +22,7 @@ import org.apache.usergrid.corepersistence.command.cursor.RequestCursor; import org.apache.usergrid.corepersistence.command.cursor.ResponseCursor; -import org.apache.usergrid.corepersistence.command.read.CollectCommand; +import org.apache.usergrid.corepersistence.command.read.Collector; import org.apache.usergrid.corepersistence.command.read.Command; import org.apache.usergrid.corepersistence.command.read.TraverseCommand; import org.apache.usergrid.persistence.core.scope.ApplicationScope; @@ -87,12 +87,12 @@ public CommandBuilder withTraverseCommand( final TraverseCommand traverseCommand /** * Build the final collection step, and */ - public Observable build( final CollectCommand collectCommand ) { - setState( collectCommand ); + public Observable build( final Collector collector ) { + setState( collector ); - collectCommand.setLimit( limit ); + collector.setLimit( limit ); - return currentObservable.compose( collectCommand ); + return currentObservable.compose( collector ); } diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Collector.java similarity index 89% rename from stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java rename to stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Collector.java index c2ad931a63..2804db40ce 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Collector.java @@ -24,13 +24,13 @@ * A command that is used to reduce our stream of results into a final output * @param */ -public interface CollectCommand extends Command{ +public interface Collector extends Command{ /** * Set the prefered result size for the command - * @param resultSize + * @param limit */ - void setLimit( final int resultSize ); + void setLimit( final int limit ); } diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/ReadCommandFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/ReadCommandFactory.java new file mode 100644 index 0000000000..07f300a4aa --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/ReadCommandFactory.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read; + + +import org.apache.usergrid.corepersistence.command.read.elasticsearch.QueryCollectionElasticSearchCollector; +import org.apache.usergrid.corepersistence.command.read.elasticsearch.QueryConnectionElasticSearchCollector; +import org.apache.usergrid.corepersistence.command.read.entity.EntityLoadCollector; +import org.apache.usergrid.corepersistence.command.read.graph.ReadGraphCollectionCommand; +import org.apache.usergrid.corepersistence.command.read.graph.ReadGraphConnectionCommand; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Id; + + +/** + * A factory for generating read commands + */ +public interface ReadCommandFactory { + + + /** + * Generate a new instance of the command with the specified parameters + * @param applicationScope + * @param sourceId + * @param collectionName + * @return + */ + ReadGraphCollectionCommand readGraphCollectionCommand(final ApplicationScope applicationScope, final Id sourceId, final String collectionName); + + /** + * Generate a new instance of the command with the specified parameters + * @param applicationScope + * @param sourceId + * @param connectionName + * @return + */ + ReadGraphConnectionCommand readGraphConnectionCommand(final ApplicationScope applicationScope, final Id sourceId, final String connectionName); + + /** + * Generate a new instance of the command with the specified parameters + * @param applicationScope + * @return + */ + EntityLoadCollector entityLoadCollector(final ApplicationScope applicationScope); + + /** + * Generate a new instance of the command with the specified parameters + * @param applicationScope + * @param sourceId + * @param collectionName + * @return + */ + QueryCollectionElasticSearchCollector queryCollectionElasticSearchCollector(final ApplicationScope applicationScope, final Id sourceId, final String collectionName); + + + /** + * Generate a new instance of the command with the specified parameters + * @param applicationScope + * @param sourceId + * @param connectionName + * @return + */ + QueryConnectionElasticSearchCollector queryConnectionElasticSearchCollector(final ApplicationScope applicationScope, final Id sourceId, final String connectionName); +} diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/AbstractQueryElasticSearchCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/AbstractQueryElasticSearchCollector.java new file mode 100644 index 0000000000..18cdeb0d29 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/AbstractQueryElasticSearchCollector.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.elasticsearch; + + +import java.util.Iterator; + +import org.apache.usergrid.corepersistence.command.read.AbstractCommand; +import org.apache.usergrid.corepersistence.command.read.Collector; +import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ElasticSearchQueryExecutor; +import org.apache.usergrid.corepersistence.results.QueryExecutor; +import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ResultsLoaderFactory; +import org.apache.usergrid.persistence.Query; +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.index.SearchTypes; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.inject.Inject; + +import rx.Observable; + + +/** + * A command that will query and load elasticsearch + * + * On future iteration, this needs to be split into 2 commands 1 that loads the candidate results and validates the + * versions then another that will search and load + * + * TODO, split this into 3 seperate observables + * + * 1) An observable that emits candidate results + * 2) An observable that validates versions by uuid (for Traverse commands) + * 3) An observbale that emits Results as a collector (for final commands) + */ +public abstract class AbstractQueryElasticSearchCollector extends AbstractCommand implements + Collector { + + + private final ResultsLoaderFactory resultsLoaderFactory; + private final ApplicationEntityIndex entityIndex; + private final ApplicationScope applicationScope; + private final SearchEdge indexScope; + private final SearchTypes searchTypes; + private final Query query; + private int limit; + + + + protected AbstractQueryElasticSearchCollector( final ApplicationEntityIndex entityIndex, + final ApplicationScope applicationScope, final SearchEdge indexScope, + final SearchTypes searchTypes, final Query query ) { + this.entityIndex = entityIndex; + this.applicationScope = applicationScope; + this.indexScope = indexScope; + this.searchTypes = searchTypes; + this.query = query; + this.resultsLoaderFactory = getResultsLoaderFactory(); + } + + + @Override + public Observable call( final Observable idObservable ) { + final Iterable executor = + new ElasticSearchQueryExecutor( resultsLoaderFactory, entityIndex, applicationScope, indexScope, searchTypes, + query.withLimit( limit )); + + return Observable.from(executor); + } + + + /** + * Get the results loader factor + * @return + */ + protected abstract ResultsLoaderFactory getResultsLoaderFactory(); + + @Override + protected Class getCursorClass() { + return Integer.class; + } + + + @Override + public void setLimit( final int limit ) { + this.limit = limit; + } +} diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryCollectionElasticSearchCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryCollectionElasticSearchCollector.java new file mode 100644 index 0000000000..2bbcc48eea --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryCollectionElasticSearchCollector.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.elasticsearch; + + +import org.apache.usergrid.corepersistence.ManagerCache; +import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ConnectionResultsLoaderFactoryImpl; +import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ResultsLoaderFactory; +import org.apache.usergrid.persistence.EntityRef; +import org.apache.usergrid.persistence.Query; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.index.SearchTypes; + +import com.google.inject.Inject; + + +/** + * Command for querying connections + */ +public class QueryCollectionElasticSearchCollector extends AbstractQueryElasticSearchCollector { + + private final ManagerCache managerCache; + private final EntityRef headEntity; + private final String connectionName; + + @Inject + protected QueryCollectionElasticSearchCollector( final ApplicationEntityIndex entityIndex, + final ApplicationScope applicationScope, + final SearchEdge indexScope, final SearchTypes searchTypes, + final Query query, final ManagerCache managerCache, + final EntityRef headEntity, final String connectionName ) { + super( entityIndex, applicationScope, indexScope, searchTypes, query ); + this.managerCache = managerCache; + this.headEntity = headEntity; + this.connectionName = connectionName; + } + + + @Override + protected ResultsLoaderFactory getResultsLoaderFactory() { + return new ConnectionResultsLoaderFactoryImpl( managerCache, headEntity, connectionName ); + } +} diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryConnectionElasticSearchCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryConnectionElasticSearchCollector.java new file mode 100644 index 0000000000..0e2f221f14 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryConnectionElasticSearchCollector.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.elasticsearch; + + +import org.apache.usergrid.corepersistence.ManagerCache; +import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ConnectionResultsLoaderFactoryImpl; +import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ResultsLoaderFactory; +import org.apache.usergrid.persistence.EntityRef; +import org.apache.usergrid.persistence.Query; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.index.SearchTypes; + +import com.google.inject.Inject; + + +/** + * Command for querying connections + */ +public class QueryConnectionElasticSearchCollector extends AbstractQueryElasticSearchCollector { + + private final ManagerCache managerCache; + private final EntityRef headEntity; + private final String connectionName; + + @Inject + protected QueryConnectionElasticSearchCollector( final ApplicationEntityIndex entityIndex, + final ApplicationScope applicationScope, + final SearchEdge indexScope, final SearchTypes searchTypes, + final Query query, final ManagerCache managerCache, + final EntityRef headEntity, final String connectionName ) { + super( entityIndex, applicationScope, indexScope, searchTypes, query ); + this.managerCache = managerCache; + this.headEntity = headEntity; + this.connectionName = connectionName; + } + + + @Override + protected ResultsLoaderFactory getResultsLoaderFactory() { + return new ConnectionResultsLoaderFactoryImpl( managerCache, headEntity, connectionName ); + } +} diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionRefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/CollectionRefsVerifier.java similarity index 95% rename from stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionRefsVerifier.java rename to stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/CollectionRefsVerifier.java index b74f4333dd..4b67ae883c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionRefsVerifier.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/CollectionRefsVerifier.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.usergrid.corepersistence.results; +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; import java.util.ArrayList; diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java similarity index 96% rename from stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java rename to stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java index 08458c334b..f9dd4e19b6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.usergrid.corepersistence.results; +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; import org.apache.usergrid.corepersistence.ManagerCache; diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ConnectionRefsVerifier.java similarity index 91% rename from stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefsVerifier.java rename to stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ConnectionRefsVerifier.java index 408edd3fce..bffb53a67c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefsVerifier.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ConnectionRefsVerifier.java @@ -16,18 +16,16 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.usergrid.corepersistence.results; +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import org.apache.usergrid.persistence.ConnectedEntityRef; import org.apache.usergrid.persistence.ConnectionRef; import org.apache.usergrid.persistence.EntityRef; import org.apache.usergrid.persistence.Results; -import org.apache.usergrid.persistence.cassandra.ConnectedEntityRefImpl; import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl; import org.apache.usergrid.persistence.model.entity.Id; diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java similarity index 96% rename from stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionResultsLoaderFactoryImpl.java rename to stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java index e9633c9a5d..707e93327c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionResultsLoaderFactoryImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.usergrid.corepersistence.results; +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; import org.apache.usergrid.corepersistence.ManagerCache; diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ElasticSearchQueryExecutor.java similarity index 96% rename from stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java rename to stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ElasticSearchQueryExecutor.java index 4a1e43a385..c7e8f56207 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ElasticSearchQueryExecutor.java @@ -17,12 +17,13 @@ * under the License. */ -package org.apache.usergrid.corepersistence.results; +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; import java.util.Iterator; import java.util.NoSuchElementException; +import org.apache.usergrid.corepersistence.results.QueryExecutor; import org.apache.usergrid.persistence.index.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +35,7 @@ import com.google.common.base.Optional; -public class ElasticSearchQueryExecutor implements QueryExecutor { +public class ElasticSearchQueryExecutor implements Iterable, Iterator { private static final Logger logger = LoggerFactory.getLogger( ElasticSearchQueryExecutor.class ); @@ -190,9 +191,6 @@ private Results buildResults( final SearchEdge indexScope, final Query query, fi } results.setCursorFromOffset( query.getOffset() ); - //ugly and tight coupling, but we don't have a choice until we finish some refactoring - results.setQueryExecutor( this ); - logger.debug( "Returning results size {}", results.size() ); return results; diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/EntityVerifier.java similarity index 98% rename from stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityVerifier.java rename to stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/EntityVerifier.java index a2dd5e06ae..7be8aa4552 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityVerifier.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/EntityVerifier.java @@ -18,7 +18,7 @@ * */ -package org.apache.usergrid.corepersistence.results; +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; import java.util.ArrayList; diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/FilteringLoader.java similarity index 99% rename from stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java rename to stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/FilteringLoader.java index a48e1b8df0..c0afe92eb8 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/FilteringLoader.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.usergrid.corepersistence.results; +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; import java.util.Collection; diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/IdsVerifier.java similarity index 94% rename from stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsVerifier.java rename to stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/IdsVerifier.java index fae6eb1c10..4d2bd55169 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsVerifier.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/IdsVerifier.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.usergrid.corepersistence.results; +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; import java.util.ArrayList; diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsLoader.java similarity index 94% rename from stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoader.java rename to stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsLoader.java index e8a7bdf115..fa0e71f1c9 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoader.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsLoader.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.usergrid.corepersistence.results; +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; import org.apache.usergrid.persistence.Results; diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsLoaderFactory.java similarity index 94% rename from stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java rename to stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsLoaderFactory.java index 3399a357b1..14db80e063 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsLoaderFactory.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.usergrid.corepersistence.results; +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; import org.apache.usergrid.persistence.Query; diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsVerifier.java similarity index 95% rename from stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsVerifier.java rename to stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsVerifier.java index 46e0983bd3..68515fad48 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsVerifier.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsVerifier.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.usergrid.corepersistence.results; +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; import java.util.Collection; diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/VersionVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/VersionVerifier.java similarity index 97% rename from stack/core/src/main/java/org/apache/usergrid/corepersistence/results/VersionVerifier.java rename to stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/VersionVerifier.java index 574ba55efe..58e4296c6f 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/VersionVerifier.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/VersionVerifier.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.usergrid.corepersistence.results; +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; import java.util.Collection; diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCollector.java similarity index 72% rename from stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java rename to stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCollector.java index 08193ef2e6..1504f26c28 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCollector.java @@ -24,7 +24,7 @@ import java.util.Map; import org.apache.usergrid.corepersistence.command.read.AbstractCommand; -import org.apache.usergrid.corepersistence.command.read.CollectCommand; +import org.apache.usergrid.corepersistence.command.read.Collector; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.persistence.EntityFactory; import org.apache.usergrid.persistence.Results; @@ -44,7 +44,7 @@ * * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification */ -public class EntityLoadCommand extends AbstractCommand implements CollectCommand { +public class EntityLoadCollector extends AbstractCommand implements Collector { private final EntityCollectionManagerFactory entityCollectionManagerFactory; @@ -53,8 +53,8 @@ public class EntityLoadCommand extends AbstractCommand im private int resultSize; - public EntityLoadCommand( final EntityCollectionManagerFactory entityCollectionManagerFactory, - final ApplicationScope applicationScope ) { + public EntityLoadCollector( final EntityCollectionManagerFactory entityCollectionManagerFactory, + final ApplicationScope applicationScope ) { this.entityCollectionManagerFactory = entityCollectionManagerFactory; this.applicationScope = applicationScope; } @@ -82,25 +82,29 @@ public Observable call( final Observable observable ) { bufferedIds -> Observable.just( bufferedIds ).flatMap( ids -> entityCollectionManager.load( ids ) ) ); - return entitySetObservable + final Observable resultsObservable = entitySetObservable .flatMap( entitySet -> { - //get our entites and filter missing ones, then collect them into a results object - final Observable mvccEntityObservable = Observable.from( entitySet.getEntities() ); + //get our entites and filter missing ones, then collect them into a results object + final Observable mvccEntityObservable = Observable.from( entitySet.getEntities() ); - //convert them to our old entity model, then filter nulls, meaning they weren't found - return mvccEntityObservable.map( mvccEntity -> mapEntity( mvccEntity ) ).filter( - entity -> entity == null ) + //convert them to our old entity model, then filter nulls, meaning they weren't found + return mvccEntityObservable.map( mvccEntity -> mapEntity( mvccEntity ) ).filter( entity -> entity == null ) - //convert them to a list, then map them into results - .toList().map( entities -> { - final Results results = Results.fromEntities( entities ); - results.setCursor( generateCursor() ); + //convert them to a list, then map them into results + .toList().map( entities -> { + final Results results = Results.fromEntities( entities ); + results.setCursor( generateCursor() ); - return results; - } ); - } ); + return results; + } ) + //if no results are present, return an empty results + .singleOrDefault( new Results( ) ); + } ); + + + return resultsObservable; } /** @@ -128,7 +132,7 @@ private org.apache.usergrid.persistence.Entity mapEntity( final MvccEntity mvccE @Override - public void setLimit( final int resultSize ) { - this.resultSize = resultSize; + public void setLimit( final int limit ) { + this.resultSize = limit; } } diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java index 9ccb969820..3b6633be34 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java @@ -22,6 +22,8 @@ import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import com.google.inject.Inject; + import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromCollectionName; @@ -36,6 +38,7 @@ public class ReadGraphCollectionCommand extends AbstractReadGraphCommand { /** * Create a new instance of our command */ + @Inject public ReadGraphCollectionCommand( final GraphManagerFactory graphManagerFactory, final String collectionName ) { super( graphManagerFactory ); this.collectionName = collectionName; diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java index adebd45bd6..26b30e7ae0 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java @@ -22,6 +22,8 @@ import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import com.google.inject.Inject; + import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getConnectionScopeName; @@ -36,6 +38,7 @@ public class ReadGraphConnectionCommand extends AbstractReadGraphCommand { /** * Create a new instance of our command */ + @Inject public ReadGraphConnectionCommand( final GraphManagerFactory graphManagerFactory, final String connectionName ) { super( graphManagerFactory ); this.connectionName = connectionName; diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java index cf0764e97f..ec4271ab49 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java @@ -25,7 +25,7 @@ import java.util.NoSuchElementException; import org.apache.usergrid.corepersistence.command.CommandBuilder; -import org.apache.usergrid.corepersistence.command.read.entity.EntityLoadCommand; +import org.apache.usergrid.corepersistence.command.read.entity.EntityLoadCollector; import org.apache.usergrid.persistence.EntityRef; import org.apache.usergrid.persistence.Results; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; @@ -89,7 +89,7 @@ public boolean hasNext() { //construct our results to be observed later. This is a cold observable final Observable resultsObservable = - commandBuilder.build( new EntityLoadCommand( entityCollectionManagerFactory, applicationScope ) ); + commandBuilder.build( new EntityLoadCollector( entityCollectionManagerFactory, applicationScope ) ); this.observableIterator = resultsObservable.toBlocking().getIterator(); diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java new file mode 100644 index 0000000000..382fb6429e --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.results; + + +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.apache.usergrid.persistence.Results; + +import rx.Observable; + + +/** + * Our proxy to allow us to subscribe to observable results, then return them as an interator. A bridge for 2.0 -> 1.0 + * code + */ +public class ObservableQueryExecutor implements QueryExecutor { + + private final Observable resultsObservable; + + public Iterator iterator; + + + public ObservableQueryExecutor( final Observable resultsObservable ) { + this.resultsObservable = resultsObservable; + } + + + @Override + public Iterator iterator() { + return this; + } + + + @Override + public boolean hasNext() { + + if ( iterator == null ) { + iterator = resultsObservable.toBlocking().getIterator(); + } + + + return iterator.hasNext(); + } + + + @Override + public Results next() { + if ( !hasNext() ) { + throw new NoSuchElementException( "No more results present" ); + } + final Results next = iterator.next(); + + next.setQueryExecutor( this ); + + return next; + } +}