From bed147761a006d9167565d558f4ba626daea33d3 Mon Sep 17 00:00:00 2001 From: Gezapeti Cseh Date: Tue, 11 Sep 2018 00:12:59 +0200 Subject: [PATCH] SOLR-12718 StreamContext ctor should always take a SolrClientCache Change-Id: Id8c980f2688861e06a7d127ac9fd985af1ccaacf --- solr/CHANGES.txt | 2 + .../org/apache/solr/handler/GraphHandler.java | 3 +- .../apache/solr/handler/StreamHandler.java | 3 +- .../apache/solr/handler/sql/SolrTable.java | 3 +- .../solr/client/solrj/io/ModelCache.java | 3 +- .../solrj/io/graph/GatherNodesStream.java | 3 +- .../client/solrj/io/sql/ResultSetImpl.java | 3 +- .../solrj/io/stream/ExecutorStream.java | 3 +- .../client/solrj/io/stream/FetchStream.java | 3 +- .../client/solrj/io/stream/StreamContext.java | 10 +- .../client/solrj/io/stream/TopicStream.java | 3 +- .../solrj/io/graph/GraphExpressionTest.java | 56 ++-- .../solr/client/solrj/io/graph/GraphTest.java | 3 +- .../solrj/io/stream/JDBCStreamTest.java | 13 +- .../solrj/io/stream/MathExpressionTest.java | 252 +++++++++--------- .../io/stream/SelectWithEvaluatorsTest.java | 3 +- .../solrj/io/stream/StreamDecoratorTest.java | 188 +++++-------- .../solrj/io/stream/StreamExpressionTest.java | 83 ++---- .../client/solrj/io/stream/StreamingTest.java | 88 +++--- .../eval/AbsoluteValueEvaluatorTest.java | 3 +- .../io/stream/eval/ArrayEvaluatorTest.java | 11 +- .../stream/eval/ConversionEvaluatorsTest.java | 7 +- .../stream/eval/TemporalEvaluatorsTest.java | 15 +- 23 files changed, 309 insertions(+), 452 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 0d71204f5e5e..a57bcba5113a 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -374,6 +374,8 @@ Optimizations Other Changes ---------------------- +* SOLR-12718: StreamContext ctor should always take a SolrClientCache (gezapeti) + * SOLR-12361: Allow nested child documents to be in field values of a SolrInputDocument as an alternative to add/get ChildDocuments off to the side. The latter is now referred to as "anonymous" child documents as opposed to "labelled" (by the field name). Anonymous child docs might be deprecated in the future. diff --git a/solr/core/src/java/org/apache/solr/handler/GraphHandler.java b/solr/core/src/java/org/apache/solr/handler/GraphHandler.java index ed5ae0aeec1f..60f6fc9a11eb 100644 --- a/solr/core/src/java/org/apache/solr/handler/GraphHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/GraphHandler.java @@ -116,8 +116,7 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw return; } - StreamContext context = new StreamContext(); - context.setSolrClientCache(StreamHandler.clientCache); + StreamContext context = new StreamContext(StreamHandler.clientCache); context.put("core", this.coreName); Traversal traversal = new Traversal(); context.put("traversal", traversal); diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index a447093f7362..0aa00c447030 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -161,11 +161,10 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw int worker = params.getInt("workerID", 0); int numWorkers = params.getInt("numWorkers", 1); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(clientCache); context.put("shards", getCollectionShards(params)); context.workerID = worker; context.numWorkers = numWorkers; - context.setSolrClientCache(clientCache); context.setModelCache(modelCache); context.setObjectCache(objectCache); context.put("core", this.coreName); diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java index 46b09d219432..c652db91dc1a 100644 --- a/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java +++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java @@ -159,8 +159,7 @@ private Enumerable query(final Properties properties, throw new RuntimeException(e); } - StreamContext streamContext = new StreamContext(); - streamContext.setSolrClientCache(StreamHandler.getClientCache()); + StreamContext streamContext = new StreamContext(StreamHandler.getClientCache()); tupleStream.setStreamContext(streamContext); final TupleStream finalStream = tupleStream; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ModelCache.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ModelCache.java index 1d7e46fa4ef2..3bf5c6276483 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ModelCache.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ModelCache.java @@ -76,8 +76,7 @@ public Tuple getModel(String zkHost, params.set("q","name_s:"+modelID); params.set("fl", "terms_ss, idfs_ds, weights_ds, iteration_i, _version_"); params.set(SORT, "iteration_i desc"); - StreamContext streamContext = new StreamContext(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); CloudSolrStream stream = new CloudSolrStream(zkHost, collection, params); stream.setStreamContext(streamContext); Tuple tuple = null; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java index c0fd054e2112..a41ed378807e 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java @@ -376,11 +376,10 @@ public void setStreamContext(StreamContext context) { if (traversal == null) { //No traversal in the context. So create a new context and a new traversal. //This ensures that two separate traversals in the same expression don't pollute each others traversal. - StreamContext localContext = new StreamContext(); + StreamContext localContext = new StreamContext(context.getSolrClientCache()); localContext.numWorkers = context.numWorkers; localContext.workerID = context.workerID; - localContext.setSolrClientCache(context.getSolrClientCache()); localContext.setStreamFactory(context.getStreamFactory()); for(Object key :context.getEntries().keySet()) { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/sql/ResultSetImpl.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/sql/ResultSetImpl.java index 7be0ebd96985..75385f7c3be9 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/sql/ResultSetImpl.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/sql/ResultSetImpl.java @@ -64,8 +64,7 @@ class ResultSetImpl implements ResultSet { try { this.solrStream = new PushBackStream(solrStream); - StreamContext context = new StreamContext(); - context.setSolrClientCache(((ConnectionImpl)this.statement.getConnection()).getSolrClientCache()); + StreamContext context = new StreamContext(((ConnectionImpl)this.statement.getConnection()).getSolrClientCache()); this.solrStream.setStreamContext(context); this.solrStream.open(); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExecutorStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExecutorStream.java index bee208c75c3a..316880625d82 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExecutorStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExecutorStream.java @@ -186,8 +186,7 @@ public static class StreamTask implements Runnable { public StreamTask(ArrayBlockingQueue queue, StreamFactory streamFactory, StreamContext streamContext) { this.queue = queue; this.streamFactory = streamFactory; - this.streamContext = new StreamContext(); - this.streamContext.setSolrClientCache(streamContext.getSolrClientCache()); + this.streamContext = new StreamContext(streamContext.getSolrClientCache()); this.streamContext.setModelCache(streamContext.getModelCache()); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FetchStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FetchStream.java index fbdba168fa9c..8f993c0ea52f 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FetchStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FetchStream.java @@ -237,8 +237,7 @@ private void fetchBatch() throws IOException { params.add(SORT, "_version_ desc"); CloudSolrStream cloudSolrStream = new CloudSolrStream(zkHost, collection, params); - StreamContext newContext = new StreamContext(); - newContext.setSolrClientCache(streamContext.getSolrClientCache()); + StreamContext newContext = new StreamContext(streamContext.getSolrClientCache()); cloudSolrStream.setStreamContext(newContext); Map fetched = new HashMap<>(); try { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java index 778aacea1101..a812eff1a6f6 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java @@ -42,9 +42,13 @@ public class StreamContext implements Serializable{ private ConcurrentMap objectCache; public int workerID; public int numWorkers; - private SolrClientCache clientCache; private ModelCache modelCache; private StreamFactory streamFactory; + final private SolrClientCache clientCache; + + public StreamContext(SolrClientCache clientCache){ + this.clientCache = clientCache; + } public ConcurrentMap getObjectCache() { return this.objectCache; @@ -74,10 +78,6 @@ public Map getEntries() { return this.entries; } - public void setSolrClientCache(SolrClientCache clientCache) { - this.clientCache = clientCache; - } - public void setModelCache(ModelCache modelCache) { this.modelCache = modelCache; } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java index 9af4cbfeaf46..4ff5659972ba 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java @@ -428,8 +428,7 @@ private long getCheckpoint(Slice slice, Set liveNodes) throws IOExceptio SolrStream solrStream = new SolrStream(coreUrl, params); if(streamContext != null) { - StreamContext localContext = new StreamContext(); - localContext.setSolrClientCache(streamContext.getSolrClientCache()); + StreamContext localContext = new StreamContext(streamContext.getSolrClientCache()); solrStream.setStreamContext(localContext); } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java index 2294d7188afa..8cbb667804d2 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java @@ -121,9 +121,8 @@ public void testShortestPathStream() throws Exception { List tuples = null; Set paths = null; ShortestPathStream stream = null; - StreamContext context = new StreamContext(); SolrClientCache cache = new SolrClientCache(); - context.setSolrClientCache(cache); + StreamContext context = new StreamContext(cache); StreamFactory factory = new StreamFactory() .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) @@ -264,9 +263,8 @@ public void testGatherNodesStream() throws Exception { List tuples = null; Set paths = null; GatherNodesStream stream = null; - StreamContext context = new StreamContext(); SolrClientCache cache = new SolrClientCache(); - context.setSolrClientCache(cache); + StreamContext context = new StreamContext(cache); StreamFactory factory = new StreamFactory() .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) @@ -319,8 +317,7 @@ public void testGatherNodesStream() throws Exception { stream = (GatherNodesStream)factory.constructStream(expr2); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); stream.setStreamContext(context); @@ -357,8 +354,7 @@ public void testGatherNodesStream() throws Exception { stream = (GatherNodesStream)factory.constructStream(expr); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); stream.setStreamContext(context); tuples = getTuples(stream); Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING)); @@ -375,8 +371,7 @@ public void testGatherNodesStream() throws Exception { stream = (GatherNodesStream)factory.constructStream(expr); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -415,9 +410,6 @@ public void testScoreNodesStream() throws Exception { List tuples = null; TupleStream stream = null; - StreamContext context = new StreamContext(); - SolrClientCache cache = new SolrClientCache(); - context.setSolrClientCache(cache); StreamFactory factory = new StreamFactory() .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) @@ -450,8 +442,8 @@ public void testScoreNodesStream() throws Exception { stream = factory.constructStream(expr2); - context = new StreamContext(); - context.setSolrClientCache(cache); + SolrClientCache cache = new SolrClientCache(); + StreamContext context = new StreamContext(cache); stream.setStreamContext(context); @@ -487,8 +479,7 @@ public void testScoreNodesStream() throws Exception { stream = factory.constructStream(expr2); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); stream.setStreamContext(context); @@ -530,9 +521,6 @@ public void testScoreNodesFacetStream() throws Exception { List tuples = null; TupleStream stream = null; - StreamContext context = new StreamContext(); - SolrClientCache cache = new SolrClientCache(); - context.setSolrClientCache(cache); StreamFactory factory = new StreamFactory() .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) @@ -552,8 +540,8 @@ public void testScoreNodesFacetStream() throws Exception { stream = factory.constructStream(expr); - context = new StreamContext(); - context.setSolrClientCache(cache); + SolrClientCache cache = new SolrClientCache(); + StreamContext context = new StreamContext(cache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -600,9 +588,8 @@ public void testGatherNodesFriendsStream() throws Exception { List tuples = null; GatherNodesStream stream = null; - StreamContext context = new StreamContext(); SolrClientCache cache = new SolrClientCache(); - context.setSolrClientCache(cache); + StreamContext context = new StreamContext(cache); StreamFactory factory = new StreamFactory() .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) @@ -638,8 +625,7 @@ public void testGatherNodesFriendsStream() throws Exception { "scatter=\"branches, leaves\", trackTraversal=\"true\")"; stream = (GatherNodesStream)factory.constructStream(expr); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -676,8 +662,7 @@ public void testGatherNodesFriendsStream() throws Exception { "gather=\"to_s\")"; stream = (GatherNodesStream)factory.constructStream(expr); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -697,8 +682,7 @@ public void testGatherNodesFriendsStream() throws Exception { "gather=\"to_s\", scatter=\"branches, leaves\")"; stream = (GatherNodesStream)factory.constructStream(expr); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -725,8 +709,7 @@ public void testGatherNodesFriendsStream() throws Exception { "gather=\"to_s\")"; stream = (GatherNodesStream)factory.constructStream(expr2); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -742,8 +725,7 @@ public void testGatherNodesFriendsStream() throws Exception { String expr3 = "hashJoin("+expr2+", hashed="+expr2+", on=\"node\")"; HashJoinStream hstream = (HashJoinStream)factory.constructStream(expr3); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); hstream.setStreamContext(context); tuples = getTuples(hstream); @@ -768,8 +750,7 @@ public void testGatherNodesFriendsStream() throws Exception { "gather=\"to_s\", scatter=\"branches, leaves\")"; stream = (GatherNodesStream)factory.constructStream(expr2); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -809,8 +790,7 @@ public void testGatherNodesFriendsStream() throws Exception { "gather=\"to_s\", scatter=\"branches, leaves\", trackTraversal=\"true\")"; stream = (GatherNodesStream)factory.constructStream(expr2); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); stream.setStreamContext(context); tuples = getTuples(stream); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphTest.java index fc24a63e3bba..ddb8853e20d7 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphTest.java @@ -97,9 +97,8 @@ public void testShortestPathStream() throws Exception { Set paths = null; ShortestPathStream stream = null; String zkHost = cluster.getZkServer().getZkAddress(); - StreamContext context = new StreamContext(); SolrClientCache cache = new SolrClientCache(); - context.setSolrClientCache(cache); + StreamContext context = new StreamContext(cache); SolrParams sParams = StreamingTest.mapParams("fq", "predicate_s:knows"); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java index 501c6ddaa71d..716c8a0ed2f5 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java @@ -233,9 +233,8 @@ public void testJDBCSolrMerge() throws Exception { statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('AL', 'Algeria')"); } - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); // Load Solr new UpdateRequest() @@ -315,9 +314,8 @@ public void testJDBCSolrInnerJoinExpression() throws Exception{ String expression; TupleStream stream; List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { // Basic test @@ -398,9 +396,8 @@ public void testJDBCSolrInnerJoinExpressionWithProperties() throws Exception{ String expression; TupleStream stream; List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { // Basic test for no alias @@ -513,10 +510,8 @@ public void testJDBCSolrInnerJoinRollupExpression() throws Exception{ String expression; TupleStream stream; List tuples; - - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { // Basic test diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java index 137add6a0390..7b94ff4115f5 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java @@ -93,7 +93,7 @@ public void testAnalyzeEvaluator() throws Exception { SolrStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 5); @@ -127,7 +127,7 @@ public void testAnalyzeEvaluator() throws Exception { paramsLoc.set("qt", "/stream"); solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertEquals(tuples.size(), 1); @@ -143,7 +143,7 @@ public void testAnalyzeEvaluator() throws Exception { solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertTrue(tuples.size() == 5); @@ -178,7 +178,7 @@ public void testAnalyzeEvaluator() throws Exception { solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -191,7 +191,7 @@ public void testAnalyzeEvaluator() throws Exception { solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -221,7 +221,7 @@ public void testMemset() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertEquals(tuples.size(), 1); @@ -248,7 +248,7 @@ public void testMemsetSize() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertEquals(tuples.size(), 1); @@ -299,7 +299,7 @@ public void testMemsetTimeSeries() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -337,7 +337,7 @@ public void testLatlonFunctions() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -367,7 +367,7 @@ public void testHist() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -411,7 +411,7 @@ public void testCumulativeProbability() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -462,7 +462,7 @@ public void testCorrelationStream() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -509,7 +509,7 @@ public void testCovariance() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -549,7 +549,7 @@ public void testDistance() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -693,7 +693,7 @@ public void testReverse() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -743,7 +743,7 @@ public void testCopyOf() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -805,7 +805,7 @@ public void testCopyOfRange() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -834,7 +834,7 @@ public void testPercentile() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -850,7 +850,7 @@ public void testPercentile() throws Exception { solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -865,7 +865,7 @@ public void testPercentile() throws Exception { solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -884,7 +884,7 @@ public void testPrimes() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -911,7 +911,7 @@ public void testBinomialCoefficient() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -930,7 +930,7 @@ public void testAscend() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -983,7 +983,7 @@ public void testRankTransform() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1010,7 +1010,7 @@ public void testArray() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1050,7 +1050,7 @@ public void testOnes() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1072,7 +1072,7 @@ public void testZeros() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1105,7 +1105,7 @@ public void testMatrix() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1174,7 +1174,7 @@ public void testMatrixMath() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1274,7 +1274,7 @@ public void testTranspose() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1304,7 +1304,7 @@ public void testUnitize() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1340,7 +1340,7 @@ public void testNormalizeSum() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1379,7 +1379,7 @@ public void testStandardize() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1423,7 +1423,7 @@ public void testMarkovChain() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1445,7 +1445,7 @@ public void testAddAll() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1471,7 +1471,7 @@ public void testProbabilityRange() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1498,7 +1498,7 @@ public void testDistributions() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; try { TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1523,7 +1523,7 @@ public void testDistributions() throws Exception { //If it fails twice in a row, we probably broke some code. TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1552,7 +1552,7 @@ public void testSumDifference() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1568,7 +1568,7 @@ public void testMeanDifference() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1593,7 +1593,7 @@ public void testTermVectors() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1671,7 +1671,7 @@ public void testTermVectors() throws Exception { paramsLoc.set("expr", cexpr); paramsLoc.set("qt", "/stream"); solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1744,7 +1744,7 @@ public void testTermVectors() throws Exception { paramsLoc.set("expr", cexpr); paramsLoc.set("qt", "/stream"); solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1815,7 +1815,7 @@ public void testTermVectors() throws Exception { paramsLoc.set("expr", cexpr); paramsLoc.set("qt", "/stream"); solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1869,7 +1869,7 @@ public void testTermVectors() throws Exception { paramsLoc.set("expr", cexpr); paramsLoc.set("qt", "/stream"); solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1894,7 +1894,7 @@ public void testEbeSubtract() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1945,7 +1945,7 @@ public void testMatrixMult() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2024,7 +2024,7 @@ public void testKmeans() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2103,7 +2103,7 @@ public void testMultiKmeans() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2183,7 +2183,7 @@ public void testFuzzyKmeans() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2274,7 +2274,7 @@ public void testEbeMultiply() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2304,7 +2304,7 @@ public void testEbeAdd() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2353,7 +2353,7 @@ public void testSetAndGetValue() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2377,7 +2377,7 @@ public void testEbeDivide() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2399,7 +2399,7 @@ public void testFreqTable() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2440,7 +2440,7 @@ public void testFFT() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertEquals(tuples.size(), 1); @@ -2497,7 +2497,7 @@ public void testCosineSimilarity() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2518,7 +2518,7 @@ public void testPoissonDistribution() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2551,7 +2551,7 @@ public void testGeometricDistribution() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2585,7 +2585,7 @@ public void testBinomialDistribution() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2608,7 +2608,7 @@ public void testUniformIntegerDistribution() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2629,7 +2629,7 @@ public void testZipFDistribution() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2662,7 +2662,7 @@ public void testValueAt() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2681,7 +2681,7 @@ public void testBetaDistribution() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2702,7 +2702,7 @@ public void testBetaDistribution() throws Exception { paramsLoc.set("expr", cexpr); paramsLoc.set("qt", "/stream"); solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2733,7 +2733,7 @@ public void testEnumeratedDistribution() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2755,7 +2755,7 @@ public void testEnumeratedDistribution() throws Exception { paramsLoc.set("qt", "/stream"); url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2774,7 +2774,7 @@ public void testDotProduct() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2790,7 +2790,7 @@ public void testCache() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2803,7 +2803,7 @@ public void testCache() throws Exception { paramsLoc.set("expr", cexpr); paramsLoc.set("qt", "/stream"); solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2815,7 +2815,7 @@ public void testCache() throws Exception { paramsLoc.set("expr", cexpr); paramsLoc.set("qt", "/stream"); solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2828,7 +2828,7 @@ public void testCache() throws Exception { paramsLoc.set("expr", cexpr); paramsLoc.set("qt", "/stream"); solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2841,7 +2841,7 @@ public void testCache() throws Exception { paramsLoc.set("expr", cexpr); paramsLoc.set("qt", "/stream"); solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2854,7 +2854,7 @@ public void testCache() throws Exception { paramsLoc.set("expr", cexpr); paramsLoc.set("qt", "/stream"); solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2878,7 +2878,7 @@ public void testExponentialMovingAverage() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2915,7 +2915,7 @@ public void testTimeDifferencingDefaultLag() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2950,7 +2950,7 @@ public void testTimeDifferencingDefinedLag() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2974,7 +2974,7 @@ public void testNestedDoubleTimeDifference() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3000,7 +3000,7 @@ public void testPolyfit() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3042,7 +3042,7 @@ public void testTtest() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3077,7 +3077,7 @@ public void testChiSquareDataSet() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3100,7 +3100,7 @@ public void testGtestDataSet() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3131,7 +3131,7 @@ public void testMultiVariateNormalDistribution() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3180,7 +3180,7 @@ public void testKnn() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3238,7 +3238,7 @@ public void testIntegrate() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3262,7 +3262,7 @@ public void testLoess() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3303,7 +3303,7 @@ public void testSpline() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3353,7 +3353,7 @@ public void testBicubicSpline() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3382,7 +3382,7 @@ public void testAkima() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3424,7 +3424,7 @@ public void testOutliers() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3471,7 +3471,7 @@ public void testLerp() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3507,7 +3507,7 @@ public void testHarmonicFit() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3526,7 +3526,7 @@ public void testAnova() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3549,7 +3549,7 @@ public void testOlsRegress() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3594,7 +3594,7 @@ public void testKnnRegress() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3624,7 +3624,7 @@ public void testKnnRegress() throws Exception { paramsLoc.set("qt", "/stream"); url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3645,7 +3645,7 @@ public void testKnnRegress() throws Exception { paramsLoc.set("qt", "/stream"); url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3664,7 +3664,7 @@ public void testKnnRegress() throws Exception { paramsLoc.set("qt", "/stream"); url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3685,7 +3685,7 @@ public void testGaussfit() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3727,7 +3727,7 @@ public void testPlot() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3754,7 +3754,7 @@ public void testMovingAverage() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3775,7 +3775,7 @@ public void testMannWhitney() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3792,7 +3792,7 @@ public void testMovingMedian() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3811,7 +3811,7 @@ public void testSumSq() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3827,7 +3827,7 @@ public void testMonteCarlo() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3858,7 +3858,7 @@ public void testMonteCarloWithVariables() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3892,7 +3892,7 @@ public void testWeibullDistribution() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3940,7 +3940,7 @@ public void testGammaDistribution() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3984,7 +3984,7 @@ public void testLogNormalDistribution() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -4018,7 +4018,7 @@ public void testTriangularDistribution() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -4045,7 +4045,7 @@ public void testCovMatrix() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -4086,7 +4086,7 @@ public void testCorrMatrix() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -4179,7 +4179,7 @@ public void testPrecision() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -4205,7 +4205,7 @@ public void testMinMaxScale() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -4263,7 +4263,7 @@ public void testMean() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -4285,7 +4285,7 @@ public void testNorms() throws Exception { paramsLoc.set("qt", "/stream"); String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -4343,7 +4343,7 @@ public void testScale() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -4400,7 +4400,7 @@ public void testConvolution() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -4448,7 +4448,7 @@ public void testRegressAndPredict() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -4512,7 +4512,7 @@ public void testFinddelay() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -4583,7 +4583,7 @@ public void testDescribe() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -4643,7 +4643,7 @@ public void testLength() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -4678,7 +4678,7 @@ public void testConvertEvaluator() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -4692,7 +4692,7 @@ public void testConvertEvaluator() throws Exception { paramsLoc.set("qt", "/stream"); solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertTrue(tuples.size() == 2); @@ -4706,7 +4706,7 @@ public void testConvertEvaluator() throws Exception { paramsLoc.set("expr", expr); paramsLoc.set("qt", "/stream"); solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertTrue(tuples.size() == 2); @@ -4720,7 +4720,7 @@ public void testConvertEvaluator() throws Exception { paramsLoc.set("expr", expr); paramsLoc.set("qt", "/stream"); solrStream = new SolrStream(url, paramsLoc); - context = new StreamContext(); + context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java index 75bf92dd6278..b8eea9d00b85 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java @@ -93,9 +93,8 @@ public void testSelectWithEvaluatorsStream() throws Exception { String clause; TupleStream stream; List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); StreamFactory factory = new StreamFactory() .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java index ef5729d00cbb..d62b60eac237 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java @@ -120,9 +120,8 @@ public void testUniqueStream() throws Exception { StreamExpression expression; TupleStream stream; List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); StreamFactory factory = new StreamFactory() .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress()) @@ -184,9 +183,8 @@ public void testSortStream() throws Exception { StreamExpression expression; TupleStream stream; List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { StreamFactory factory = new StreamFactory() .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress()) @@ -233,9 +231,8 @@ public void testNullStream() throws Exception { StreamExpression expression; TupleStream stream; List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); StreamFactory factory = new StreamFactory() .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress()) .withFunctionName("search", CloudSolrStream.class) @@ -268,9 +265,8 @@ public void testParallelNullStream() throws Exception { StreamExpression expression; TupleStream stream; List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); StreamFactory factory = new StreamFactory() .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress()) @@ -324,9 +320,8 @@ public void testMergeStream() throws Exception { + "on=\"a_f asc\")"); stream = new MergeStream(expression, factory); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { stream.setStreamContext(streamContext); tuples = getTuples(stream); @@ -399,9 +394,8 @@ public void testRankStream() throws Exception { StreamExpression expression; TupleStream stream; List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); StreamFactory factory = new StreamFactory() .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress()) @@ -486,9 +480,8 @@ public void testReducerStream() throws Exception { List tuples; Tuple t0, t1, t2; List maps0, maps1, maps2; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); StreamFactory factory = new StreamFactory() .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress()) @@ -588,8 +581,7 @@ public void testHavingStream() throws Exception { .withFunctionName("gteq", GreaterThanEqualToEvaluator.class); stream = factory.constructStream("having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), eq(a_i, 9))"); - StreamContext context = new StreamContext(); - context.setSolrClientCache(solrClientCache); + StreamContext context = new StreamContext(solrClientCache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -598,8 +590,7 @@ public void testHavingStream() throws Exception { assertTrue(t.getString("id").equals("9")); stream = factory.constructStream("having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), and(eq(a_i, 9),lt(a_i, 10)))"); - context = new StreamContext(); - context.setSolrClientCache(solrClientCache); + context = new StreamContext(solrClientCache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -608,8 +599,7 @@ public void testHavingStream() throws Exception { assertTrue(t.getString("id").equals("9")); stream = factory.constructStream("having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), or(eq(a_i, 9),eq(a_i, 8)))"); - context = new StreamContext(); - context.setSolrClientCache(solrClientCache); + context = new StreamContext(solrClientCache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -622,16 +612,14 @@ public void testHavingStream() throws Exception { stream = factory.constructStream("having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), and(eq(a_i, 9),not(eq(a_i, 9))))"); - context = new StreamContext(); - context.setSolrClientCache(solrClientCache); + context = new StreamContext(solrClientCache); stream.setStreamContext(context); tuples = getTuples(stream); assert(tuples.size() == 0); stream = factory.constructStream("having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), and(lteq(a_i, 9), gteq(a_i, 8)))"); - context = new StreamContext(); - context.setSolrClientCache(solrClientCache); + context = new StreamContext(solrClientCache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -644,8 +632,7 @@ public void testHavingStream() throws Exception { assertTrue(t.getString("id").equals("9")); stream = factory.constructStream("having(rollup(over=a_f, sum(a_i), search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\")), and(eq(sum(a_i), 9),eq(sum(a_i), 9)))"); - context = new StreamContext(); - context.setSolrClientCache(solrClientCache); + context = new StreamContext(solrClientCache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -696,8 +683,7 @@ public void testParallelHavingStream() throws Exception { .withFunctionName("parallel", ParallelStream.class); stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), eq(a_i, 9)))"); - StreamContext context = new StreamContext(); - context.setSolrClientCache(solrClientCache); + StreamContext context = new StreamContext(solrClientCache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -706,8 +692,7 @@ public void testParallelHavingStream() throws Exception { assertTrue(t.getString("id").equals("9")); stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), and(eq(a_i, 9),lt(a_i, 10))))"); - context = new StreamContext(); - context.setSolrClientCache(solrClientCache); + context = new StreamContext(solrClientCache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -716,8 +701,7 @@ public void testParallelHavingStream() throws Exception { assertTrue(t.getString("id").equals("9")); stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\",having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), or(eq(a_i, 9),eq(a_i, 8))))"); - context = new StreamContext(); - context.setSolrClientCache(solrClientCache); + context = new StreamContext(solrClientCache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -730,8 +714,7 @@ public void testParallelHavingStream() throws Exception { stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), and(eq(a_i, 9),not(eq(a_i, 9)))))"); - context = new StreamContext(); - context.setSolrClientCache(solrClientCache); + context = new StreamContext(solrClientCache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -739,8 +722,7 @@ public void testParallelHavingStream() throws Exception { stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\",having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), and(lteq(a_i, 9), gteq(a_i, 8))))"); - context = new StreamContext(); - context.setSolrClientCache(solrClientCache); + context = new StreamContext(solrClientCache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -753,8 +735,7 @@ public void testParallelHavingStream() throws Exception { assertTrue(t.getString("id").equals("9")); stream = factory.constructStream("parallel("+COLLECTIONORALIAS+", workers=2, sort=\"a_f asc\", having(rollup(over=a_f, sum(a_i), search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=a_f)), and(eq(sum(a_i), 9),eq(sum(a_i),9))))"); - context = new StreamContext(); - context.setSolrClientCache(solrClientCache); + context = new StreamContext(solrClientCache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -793,8 +774,7 @@ public void testFetchStream() throws Exception { .withFunctionName("fetch", FetchStream.class); stream = factory.constructStream("fetch("+ COLLECTIONORALIAS +", search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\")"); - StreamContext context = new StreamContext(); - context.setSolrClientCache(solrClientCache); + StreamContext context = new StreamContext(solrClientCache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -822,8 +802,7 @@ public void testFetchStream() throws Exception { //Change the batch size stream = factory.constructStream("fetch(" + COLLECTIONORALIAS + ", search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\")"); - context = new StreamContext(); - context.setSolrClientCache(solrClientCache); + context = new StreamContext(solrClientCache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -855,8 +834,7 @@ public void testFetchStream() throws Exception { .commit(cluster.getSolrClient(), COLLECTIONORALIAS); stream = factory.constructStream("fetch("+ COLLECTIONORALIAS +", search(" + COLLECTIONORALIAS + ", q=" + id + ":99, fl=\"id,a1_s\", sort=\"id asc\"), on=\"a1_s=a2_s\", fl=\"subject\")"); - context = new StreamContext(); - context.setSolrClientCache(solrClientCache); + context = new StreamContext(solrClientCache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -884,9 +862,8 @@ public void testParallelFetchStream() throws Exception { .add(id, "9", "a_s", "hello0", "a_i", "9", "a_f", "10", "subject", "blah blah blah 9") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); TupleStream stream; List tuples; @@ -992,9 +969,8 @@ public void testDaemonStream() throws Exception { + "sum(a_i)" + "), id=\"test\", runInterval=\"1000\", queueSize=\"9\")"); daemonStream = (DaemonStream)factory.constructStream(expression); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); daemonStream.setStreamContext(streamContext); try { //Test Long and Double Sums @@ -1102,8 +1078,7 @@ public void testTerminatingDaemonStream() throws Exception { DaemonStream daemonStream; SolrClientCache cache = new SolrClientCache(); - StreamContext context = new StreamContext(); - context.setSolrClientCache(cache); + StreamContext context = new StreamContext(cache); expression = StreamExpressionParser.parse("daemon(topic("+ COLLECTIONORALIAS +","+ COLLECTIONORALIAS +", q=\"a_s:hello\", initialCheckpoint=0, id=\"topic1\", rows=2, fl=\"id\"" + "), id=test, runInterval=1000, terminate=true, queueSize=50)"); daemonStream = (DaemonStream)factory.constructStream(expression); @@ -1143,9 +1118,8 @@ public void testRollupStream() throws Exception { StreamExpression expression; TupleStream stream; List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { expression = StreamExpressionParser.parse("rollup(" + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"a_s,a_i,a_f\", sort=\"a_s asc\")," @@ -1264,11 +1238,8 @@ public void testParallelUniqueStream() throws Exception { .withFunctionName("top", RankStream.class) .withFunctionName("group", ReducerStream.class) .withFunctionName("parallel", ParallelStream.class); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); - - + StreamContext streamContext = new StreamContext(solrClientCache); try { @@ -1351,9 +1322,8 @@ public void testParallelShuffleStream() throws Exception { .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); String zkHost = cluster.getZkServer().getZkAddress(); StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTIONORALIAS, zkHost) @@ -1396,9 +1366,8 @@ public void testParallelReducerStream() throws Exception { .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); String zkHost = cluster.getZkServer().getZkAddress(); @@ -1491,9 +1460,8 @@ public void testParallelRankStream() throws Exception { .withFunctionName("group", ReducerStream.class) .withFunctionName("parallel", ParallelStream.class); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", " @@ -1538,9 +1506,8 @@ public void testParallelMergeStream() throws Exception { .withFunctionName("merge", MergeStream.class) .withFunctionName("parallel", ParallelStream.class); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { //Test ascending ParallelStream pstream = (ParallelStream) streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", merge(search(" + COLLECTIONORALIAS + ", q=\"id:(4 1 8 7 9)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), search(" + COLLECTIONORALIAS + ", q=\"id:(0 2 3 6)\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), on=\"a_i asc\"), workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i asc\")"); @@ -1592,9 +1559,8 @@ public void testParallelRollupStream() throws Exception { .withFunctionName("count", CountMetric.class); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); StreamExpression expression; TupleStream stream; @@ -1725,9 +1691,8 @@ public void testInnerJoinStream() throws Exception { StreamExpression expression; TupleStream stream; List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); StreamFactory factory = new StreamFactory() .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress()) @@ -1808,9 +1773,8 @@ public void testLeftOuterJoinStream() throws Exception { StreamExpression expression; TupleStream stream; List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); StreamFactory factory = new StreamFactory() .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress()) @@ -1891,9 +1855,8 @@ public void testHashJoinStream() throws Exception { StreamExpression expression; TupleStream stream; List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); StreamFactory factory = new StreamFactory() .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress()) @@ -1960,10 +1923,9 @@ public void testHashJoinStreamWithKnownConflict() throws Exception { StreamExpression expression; TupleStream stream; List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); - + StreamContext streamContext = new StreamContext(solrClientCache); + StreamFactory factory = new StreamFactory() .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress()) .withFunctionName("search", CloudSolrStream.class) @@ -1998,9 +1960,8 @@ public void testOuterHashJoinStreamWithKnownConflict() throws Exception { StreamExpression expression; TupleStream stream; List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); StreamFactory factory = new StreamFactory() .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress()) @@ -2050,9 +2011,8 @@ public void testOuterHashJoinStream() throws Exception { StreamExpression expression; TupleStream stream; List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); StreamFactory factory = new StreamFactory() .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) @@ -2133,9 +2093,9 @@ public void testSelectStream() throws Exception { TupleStream stream; List tuples; - StreamContext streamContext = new StreamContext(); + SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); StreamFactory factory = new StreamFactory() .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) @@ -2279,8 +2239,7 @@ public void testPriorityStream() throws Exception { expression = StreamExpressionParser.parse("priority(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0)," + "topic(collection1, collection1, q=\"a_s:hello1\", fl=\"id,a_i\", id=2000000, initialCheckpoint=0))"); stream = factory.constructStream(expression); - StreamContext context = new StreamContext(); - context.setSolrClientCache(cache); + StreamContext context = new StreamContext(cache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -2293,8 +2252,7 @@ public void testPriorityStream() throws Exception { expression = StreamExpressionParser.parse("priority(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0)," + "topic(collection1, collection1, q=\"a_s:hello1\", fl=\"id,a_i\", id=2000000, initialCheckpoint=0))"); stream = factory.constructStream(expression); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); stream.setStreamContext(context); tuples = getTuples(stream); Collections.sort(tuples, comp); @@ -2306,8 +2264,7 @@ public void testPriorityStream() throws Exception { expression = StreamExpressionParser.parse("priority(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0)," + "topic(collection1, collection1, q=\"a_s:hello1\", fl=\"id,a_i\", id=2000000, initialCheckpoint=0))"); stream = factory.constructStream(expression); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -2355,8 +2312,7 @@ public void testParallelPriorityStream() throws Exception { expression = StreamExpressionParser.parse("parallel(collection1, workers=2, sort=\"_version_ asc\", priority(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0, partitionKeys=id)," + "topic(collection1, collection1, q=\"a_s:hello1\", fl=\"id,a_i\", id=2000000, initialCheckpoint=0, partitionKeys=id)))"); stream = factory.constructStream(expression); - StreamContext context = new StreamContext(); - context.setSolrClientCache(cache); + StreamContext context = new StreamContext(cache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -2369,8 +2325,7 @@ public void testParallelPriorityStream() throws Exception { expression = StreamExpressionParser.parse("parallel(collection1, workers=2, sort=\"_version_ asc\", priority(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0, partitionKeys=id)," + "topic(collection1, collection1, q=\"a_s:hello1\", fl=\"id,a_i\", id=2000000, initialCheckpoint=0, partitionKeys=id)))"); stream = factory.constructStream(expression); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); stream.setStreamContext(context); tuples = getTuples(stream); Collections.sort(tuples, comp); @@ -2382,8 +2337,7 @@ public void testParallelPriorityStream() throws Exception { expression = StreamExpressionParser.parse("parallel(collection1, workers=2, sort=\"_version_ asc\", priority(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0, partitionKeys=id)," + "topic(collection1, collection1, q=\"a_s:hello1\", fl=\"id,a_i\", id=2000000, initialCheckpoint=0, partitionKeys=id)))"); stream = factory.constructStream(expression); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -2413,9 +2367,8 @@ public void testUpdateStream() throws Exception { StreamExpression expression; TupleStream stream; Tuple t; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); StreamFactory factory = new StreamFactory() .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) @@ -2508,9 +2461,8 @@ public void testParallelUpdateStream() throws Exception { StreamExpression expression; TupleStream stream; Tuple t; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); String zkHost = cluster.getZkServer().getZkAddress(); StreamFactory factory = new StreamFactory() @@ -2608,9 +2560,8 @@ public void testParallelDaemonUpdateStream() throws Exception { StreamExpression expression; TupleStream stream; Tuple t; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); String zkHost = cluster.getZkServer().getZkAddress(); StreamFactory factory = new StreamFactory() @@ -2783,9 +2734,8 @@ public void testParallelTerminatingDaemonUpdateStream() throws Exception { StreamExpression expression; TupleStream stream; Tuple t; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); String zkHost = cluster.getZkServer().getZkAddress(); StreamFactory factory = new StreamFactory() @@ -2903,9 +2853,8 @@ public void testCommitStream() throws Exception { StreamExpression expression; TupleStream stream; Tuple t; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); StreamFactory factory = new StreamFactory() .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) @@ -2997,9 +2946,8 @@ public void testParallelCommitStream() throws Exception { StreamExpression expression; TupleStream stream; Tuple t; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); String zkHost = cluster.getZkServer().getZkAddress(); StreamFactory factory = new StreamFactory() @@ -3096,9 +3044,8 @@ public void testParallelDaemonCommitStream() throws Exception { StreamExpression expression; TupleStream stream; Tuple t; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); String zkHost = cluster.getZkServer().getZkAddress(); StreamFactory factory = new StreamFactory() @@ -3269,9 +3216,8 @@ public void testIntersectStream() throws Exception { StreamExpression expression; TupleStream stream; List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); StreamFactory factory = new StreamFactory() .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) @@ -3448,7 +3394,7 @@ public void testLetStream() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3482,7 +3428,7 @@ public void testGetStreamForEOFTuple() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 0); @@ -3507,7 +3453,7 @@ public void testStream() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -3555,9 +3501,8 @@ public void testExecutorStream() throws Exception { String executorExpression = "executor(threads=3, search(workQueue, q=\"*:*\", fl=\"id, expr_s\", rows=1000, sort=\"id desc\"))"; executorStream = factory.constructStream(executorExpression); - StreamContext context = new StreamContext(); SolrClientCache clientCache = new SolrClientCache(); - context.setSolrClientCache(clientCache); + StreamContext context = new StreamContext(clientCache); executorStream.setStreamContext(context); getTuples(executorStream); //Destination collection should now contain all the records in the main corpus. @@ -3626,9 +3571,8 @@ public void testParallelExecutorStream() throws Exception { String executorExpression = "parallel(workQueue1, workers=2, sort=\"EOF asc\", executor(threads=3, queueSize=100, search(workQueue1, q=\"*:*\", fl=\"id, expr_s\", rows=1000, partitionKeys=id, sort=\"id desc\")))"; executorStream = factory.constructStream(executorExpression); - StreamContext context = new StreamContext(); SolrClientCache clientCache = new SolrClientCache(); - context.setSolrClientCache(clientCache); + StreamContext context = new StreamContext(clientCache); executorStream.setStreamContext(context); getTuples(executorStream); //Destination collection should now contain all the records in the main corpus. @@ -3687,9 +3631,8 @@ public void testParallelIntersectStream() throws Exception { .withFunctionName("parallel", ParallelStream.class); // basic - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { String zkHost = cluster.getZkServer().getZkAddress(); @@ -3732,9 +3675,8 @@ public void testComplementStream() throws Exception { StreamExpression expression; TupleStream stream; List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); StreamFactory factory = new StreamFactory() .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) @@ -3769,9 +3711,8 @@ public void testCartesianProductStream() throws Exception { StreamExpression expression; TupleStream stream; List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); StreamFactory factory = new StreamFactory() .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) @@ -3923,9 +3864,8 @@ public void testParallelComplementStream() throws Exception { .withFunctionName("complement", ComplementStream.class) .withFunctionName("parallel", ParallelStream.class); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { final String zkHost = cluster.getZkServer().getZkAddress(); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index add34cb076f4..d0e25931dd1f 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -106,9 +106,8 @@ public void testCloudSolrStream() throws Exception { StreamExpression expression; CloudSolrStream stream; List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { // Basic test @@ -178,9 +177,8 @@ public void testCloudSolrStream() throws Exception { Map> shardsMap = new HashMap(); shardsMap.put("myCollection", shardUrls); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(solrClientCache); context.put("shards", shardsMap); - context.setSolrClientCache(solrClientCache); // Basic test expression = StreamExpressionParser.parse("search(myCollection, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")"); @@ -232,9 +230,8 @@ public void testSqlStream() throws Exception { .commit(cluster.getSolrClient(), COLLECTIONORALIAS); List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); List shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext); try { @@ -284,9 +281,8 @@ public void testCloudSolrStreamWithZkHost() throws Exception { StreamFactory factory = new StreamFactory(); StreamExpression expression; CloudSolrStream stream; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); List tuples; try { @@ -412,9 +408,8 @@ public void testNulls() throws Exception { TupleStream stream; List tuples; Tuple tuple; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); StreamFactory factory = new StreamFactory() .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress()) @@ -487,10 +482,9 @@ public void testRandomStream() throws Exception { .withFunctionName("random", RandomStream.class); - StreamContext context = new StreamContext(); SolrClientCache cache = new SolrClientCache(); try { - context.setSolrClientCache(cache); + StreamContext context = new StreamContext(cache); expression = StreamExpressionParser.parse("random(" + COLLECTIONORALIAS + ", q=\"*:*\", rows=\"1000\", fl=\"id, a_i\")"); stream = factory.constructStream(expression); @@ -557,10 +551,9 @@ public void testKnnSearchStream() throws Exception { update.add(id, "2", "a_t", "hello world have a very nice day fancy sky"); update.commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext context = new StreamContext(); SolrClientCache cache = new SolrClientCache(); try { - context.setSolrClientCache(cache); + StreamContext context = new StreamContext(cache); ModifiableSolrParams sParams = new ModifiableSolrParams(StreamingTest.mapParams(CommonParams.QT, "/stream")); sParams.add("expr", "knnSearch(" + COLLECTIONORALIAS + ", id=\"1\", qf=\"a_t\", rows=\"4\", fl=\"id, score\", mintf=\"1\")"); JettySolrRunner jetty = cluster.getJettySolrRunner(0); @@ -635,10 +628,9 @@ public void testStatsStream() throws Exception { StreamExpression expression; TupleStream stream; List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache cache = new SolrClientCache(); try { - streamContext.setSolrClientCache(cache); + StreamContext streamContext = new StreamContext(cache); String expr = "stats(" + COLLECTIONORALIAS + ", q=*:*, sum(a_i), sum(a_f), min(a_i), min(a_f), max(a_i), max(a_f), avg(a_i), avg(a_f), count(*))"; expression = StreamExpressionParser.parse(expr); stream = factory.constructStream(expression); @@ -678,9 +670,8 @@ public void testStatsStream() throws Exception { expr = "stats(myCollection, q=*:*, sum(a_i), sum(a_f), min(a_i), min(a_f), max(a_i), max(a_f), avg(a_i), avg(a_f), count(*))"; Map> shardsMap = new HashMap(); shardsMap.put("myCollection", shardUrls); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(cache); context.put("shards", shardsMap); - context.setSolrClientCache(cache); stream = factory.constructStream(expr); stream.setStreamContext(context); @@ -1420,8 +1411,7 @@ public void testTopicStream() throws Exception { expression = StreamExpressionParser.parse("topic(collection1, collection1, q=\"a_s:hello\", fl=\"id\", id=\"1000000\", checkpointEvery=3)"); stream = factory.constructStream(expression); - StreamContext context = new StreamContext(); - context.setSolrClientCache(cache); + StreamContext context = new StreamContext(cache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -1433,8 +1423,7 @@ public void testTopicStream() throws Exception { expression = StreamExpressionParser.parse("search(collection1, q=\"id:1000000\", fl=\"id, checkpoint_ss, _version_\", sort=\"id asc\")"); stream = factory.constructStream(expression); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); stream.setStreamContext(context); tuples = getTuples(stream); assertEquals(tuples.size(), 1); @@ -1451,8 +1440,7 @@ public void testTopicStream() throws Exception { expression = StreamExpressionParser.parse("topic(collection1, collection1, fl=\"id\", q=\"a_s:hello\", id=\"1000000\", checkpointEvery=2)"); stream = factory.constructStream(expression); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); stream.setStreamContext(context); try { @@ -1464,8 +1452,7 @@ public void testTopicStream() throws Exception { // Checkpoint should not have changed. expression = StreamExpressionParser.parse("search(collection1, q=\"id:1000000\", fl=\"id, checkpoint_ss, _version_\", sort=\"id asc\")"); TupleStream cstream = factory.constructStream(expression); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); cstream.setStreamContext(context); tuples = getTuples(cstream); @@ -1482,8 +1469,7 @@ public void testTopicStream() throws Exception { //Checkpoint should have changed. expression = StreamExpressionParser.parse("search(collection1, q=\"id:1000000\", fl=\"id, checkpoint_ss, _version_\", sort=\"id asc\")"); cstream = factory.constructStream(expression); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); cstream.setStreamContext(context); tuples = getTuples(cstream); @@ -1505,8 +1491,7 @@ public void testTopicStream() throws Exception { try { expression = StreamExpressionParser.parse("daemon(topic(collection1, collection1, fl=\"id\", q=\"a_s:hello\", id=\"1000000\", checkpointEvery=2), id=\"test\", runInterval=\"1000\", queueSize=\"9\")"); dstream = (DaemonStream) factory.constructStream(expression); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); dstream.setStreamContext(context); //Index a few more documents @@ -1595,8 +1580,7 @@ public void testParallelTopicStream() throws Exception { "partitionKeys=\"id\"))"); stream = factory.constructStream(expression); - StreamContext context = new StreamContext(); - context.setSolrClientCache(cache); + StreamContext context = new StreamContext(cache); stream.setStreamContext(context); tuples = getTuples(stream); @@ -1609,8 +1593,7 @@ public void testParallelTopicStream() throws Exception { expression = StreamExpressionParser.parse("search(collection1, q=\"id:1000000*\", fl=\"id, checkpoint_ss, _version_\", sort=\"id asc\")"); stream = factory.constructStream(expression); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); stream.setStreamContext(context); tuples = getTuples(stream); assertEquals(tuples.size(), 2); @@ -1638,8 +1621,7 @@ public void testParallelTopicStream() throws Exception { "partitionKeys=\"id\"))"); stream = factory.constructStream(expression); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); stream.setStreamContext(context); assertTopicRun(stream, "10", "11"); @@ -1658,8 +1640,7 @@ public void testParallelTopicStream() throws Exception { "partitionKeys=\"id\"))"); stream = factory.constructStream(expression); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); stream.setStreamContext(context); assertTopicRun(stream, "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11"); @@ -1673,8 +1654,7 @@ public void testParallelTopicStream() throws Exception { //Run the same topic again including the initialCheckpoint. It should start where it left off. //initialCheckpoint should be ignored for all but the first run. stream = factory.constructStream(expression); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); stream.setStreamContext(context); assertTopicRun(stream, "12", "13"); @@ -1692,8 +1672,7 @@ public void testParallelTopicStream() throws Exception { "partitionKeys=\"id\"))"); stream = factory.constructStream(expression); - context = new StreamContext(); - context.setSolrClientCache(cache); + context = new StreamContext(cache); stream.setStreamContext(context); assertTopicSubject(stream, "ha ha bla blah0", @@ -1727,7 +1706,7 @@ public void testEchoStream() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1820,7 +1799,7 @@ public void testEvalStream() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -1867,7 +1846,7 @@ public void testTimeSeriesStream() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 5); @@ -2048,7 +2027,7 @@ public void testTupleStream() throws Exception { String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; TupleStream solrStream = new SolrStream(url, paramsLoc); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); solrStream.setStreamContext(context); List tuples = getTuples(solrStream); assertTrue(tuples.size() == 1); @@ -2091,9 +2070,8 @@ public void testBasicTextLogitStream() throws Exception { StreamExpression expression; TupleStream stream; List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); StreamFactory factory = new StreamFactory() .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) @@ -2213,9 +2191,8 @@ public void testFeaturesSelectionStream() throws Exception { StreamExpression expression; TupleStream stream; List tuples; - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); StreamFactory factory = new StreamFactory() .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) @@ -2297,10 +2274,9 @@ public void testSignificantTermsStream() throws Exception { .withDefaultZkHost(cluster.getZkServer().getZkAddress()) .withFunctionName("significantTerms", SignificantTermsStream.class); - StreamContext streamContext = new StreamContext(); SolrClientCache cache = new SolrClientCache(); - streamContext.setSolrClientCache(cache); try { + StreamContext streamContext = new StreamContext(cache); String significantTerms = "significantTerms(collection1, q=\"id:a*\", field=\"test_t\", limit=3, minTermLength=1, maxDocFreq=\".5\")"; stream = factory.constructStream(significantTerms); @@ -2408,9 +2384,8 @@ public void testSignificantTermsStream() throws Exception { Map> shardsMap = new HashMap(); shardsMap.put("myCollection", shardUrls); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(cache); context.put("shards", shardsMap); - context.setSolrClientCache(cache); significantTerms = "significantTerms(myCollection, q=\"id:a*\", field=\"test_t\", limit=2, minDocFreq=\"2700\", minTermLength=1, maxDocFreq=\".5\")"; stream = factory.constructStream(significantTerms); stream.setStreamContext(context); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java index 8f2110072b60..f871b263d2ee 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java @@ -131,9 +131,8 @@ public void testUniqueStream() throws Exception { .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { SolrParams sParams = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc"); CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams); @@ -175,9 +174,8 @@ public void testNonePartitionKeys() throws Exception { .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9") .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { SolrParams sParamsA = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none"); @@ -208,9 +206,8 @@ public void testParallelUniqueStream() throws Exception { .add(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { @@ -251,9 +248,8 @@ public void testMultipleFqClauses() throws Exception { streamFactory.withCollectionZkHost(COLLECTIONORALIAS, zkHost); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { ModifiableSolrParams params = new ModifiableSolrParams(mapParams("q", "*:*", "fl", "id,a_i", @@ -279,9 +275,8 @@ public void testRankStream() throws Exception { .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc"); CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams); @@ -311,9 +306,8 @@ public void testParallelRankStream() throws Exception { .add(id, "10", "a_s", "hello1", "a_i", "10", "a_f", "1") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(new StreamContext(solrClientCache); try { SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i"); CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams); @@ -347,9 +341,8 @@ public void testTrace() throws Exception { .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { //Test with spaces in the parameter lists. @@ -383,9 +376,8 @@ public void testReducerStream() throws Exception { .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { //Test with spaces in the parameter lists. @@ -456,9 +448,8 @@ public void testZeroReducerStream() throws Exception { .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { //Test with spaces in the parameter lists. @@ -492,9 +483,8 @@ public void testParallelReducerStream() throws Exception { .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s"); @@ -570,9 +560,8 @@ public void testExceptionStream() throws Exception { .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); //Test an error that comes originates from the /select handler try { SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc"); @@ -666,9 +655,8 @@ public void testStatsStream() throws Exception { .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { SolrParams sParamsA = mapParams("q", "*:*"); @@ -733,9 +721,8 @@ public void testFacetStream() throws Exception { .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc"); @@ -1146,9 +1133,8 @@ private void checkSort(JettySolrRunner jetty, String field, String sortDir, Stri List selectOrder = ("asc".equals(sortDir)) ? Arrays.asList(ascOrder) : Arrays.asList(descOrder); List selectOrderBool = ("asc".equals(sortDir)) ? Arrays.asList(ascOrderBool) : Arrays.asList(descOrderBool); SolrParams exportParams = mapParams("q", "*:*", "qt", "/export", "fl", "id," + field, "sort", field + " " + sortDir + ",id asc"); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try (CloudSolrStream solrStream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, exportParams)) { solrStream.setStreamContext(streamContext); List tuples = getTuples(solrStream); @@ -1191,9 +1177,8 @@ private void checkReturnValsForEmpty(String[] fields) throws IOException { } SolrParams sParams = mapParams("q", "*:*", "qt", "/export", "fl", fl.toString(), "sort", "id asc"); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try (CloudSolrStream solrStream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams)) { solrStream.setStreamContext(streamContext); @@ -1346,9 +1331,8 @@ public void testSubFacetStream() throws Exception { .add(id, "9", "level1_s", "hello0", "level2_s", "b", "a_i", "14", "a_f", "10") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { @@ -1538,9 +1522,8 @@ public void testRollupStream() throws Exception { .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9") .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc"); @@ -1712,9 +1695,8 @@ public void testRollupStream() throws Exception { public void testDaemonTopicStream() throws Exception { Assume.assumeTrue(!useAlias); - StreamContext context = new StreamContext(); SolrClientCache cache = new SolrClientCache(); - context.setSolrClientCache(cache); + StreamContext context = new StreamContext(cache); try { SolrParams sParams = mapParams("q", "a_s:hello0", "rows", "500", "fl", "id"); @@ -1800,9 +1782,8 @@ public void testRollupWithNoParallel() throws Exception { .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { //Intentionally adding partitionKeys to trigger SOLR-12674 @@ -1832,7 +1813,7 @@ public void testRollupWithNoParallel() throws Exception { solrParams.add("qt", "/stream"); solrParams.add("expr", "rollup(search(" + COLLECTIONORALIAS + ",q=\"*:*\",fl=\"a_s,a_i,a_f\",sort=\"a_s desc\",partitionKeys=\"a_s\"),over=\"a_s\")\n"); SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams); - streamContext = new StreamContext(); + streamContext = new StreamContext(solrClientCache); solrStream.setStreamContext(streamContext); tuples = getTuples(solrStream); assert (tuples.size() == 3); @@ -1857,9 +1838,8 @@ public void testParallelRollupStream() throws Exception { .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s"); @@ -1977,9 +1957,8 @@ public void testZeroParallelReducerStream() throws Exception { .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { SolrParams sParamsA = mapParams("q", "blah", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s"); CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA); @@ -2005,9 +1984,8 @@ public void testTuple() throws Exception { "1", "i_multi", "2", "f_multi", "1.2", "f_multi", "1.3") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f,s_multi,i_multi,f_multi", "sort", "a_s asc"); @@ -2054,9 +2032,8 @@ public void testMergeStream() throws Exception { .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { //Test ascending @@ -2136,9 +2113,8 @@ public void testParallelMergeStream() throws Exception { .add(id, "9", "a_s", "hello1", "a_i", "100", "a_f", "1") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { //Test ascending @@ -2193,9 +2169,8 @@ public void testParallelEOF() throws Exception { .add(id, "9", "a_s", "hello1", "a_i", "100", "a_f", "1") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); try { //Test ascending @@ -2231,9 +2206,8 @@ public void streamTests() throws Exception { .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); //Basic CloudSolrStream Test with Descending Sort try { @@ -2296,9 +2270,8 @@ private void trySortWithQt(String which) throws Exception { //Basic CloudSolrStream Test bools desc SolrParams sParams = mapParams("q", "*:*", "qt", which, "fl", "id,b_sing", "sort", "b_sing asc,id asc"); - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams); try { @@ -2364,9 +2337,8 @@ public void testAllValidExportTypes() throws Exception { // We should be getting the exact same thing back with both the export and select handlers, so test private void tryWithQt(String which) throws IOException { - StreamContext streamContext = new StreamContext(); SolrClientCache solrClientCache = new SolrClientCache(); - streamContext.setSolrClientCache(solrClientCache); + StreamContext streamContext = new StreamContext(solrClientCache); SolrParams sParams = StreamingTest.mapParams("q", "*:*", "qt", which, "fl", "id,i_sing,i_multi,l_sing,l_multi,f_sing,f_multi,d_sing,d_multi,dt_sing,dt_multi,s_sing,s_multi,b_sing,b_multi", "sort", "i_sing asc"); @@ -2492,7 +2464,7 @@ public boolean assertLong(Tuple tuple, String fieldName, long l) throws Exceptio } private void attachStreamFactory(TupleStream tupleStream) { - StreamContext streamContext = new StreamContext(); + StreamContext streamContext = new StreamContext(new SolrClientCache()); streamContext.setStreamFactory(streamFactory); tupleStream.setStreamContext(streamContext); } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AbsoluteValueEvaluatorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AbsoluteValueEvaluatorTest.java index ff216f98d93b..201937a3cf5c 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AbsoluteValueEvaluatorTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AbsoluteValueEvaluatorTest.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.lucene.util.LuceneTestCase; +import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.eval.AbsoluteValueEvaluator; import org.apache.solr.client.solrj.io.eval.AddEvaluator; @@ -72,7 +73,7 @@ public void absoluteValueOneField() throws Exception{ @Test public void absoluteValueFromContext() throws Exception{ StreamEvaluator evaluator = factory.constructEvaluator("abs(a)"); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); evaluator.setStreamContext(context); Object result; diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/ArrayEvaluatorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/ArrayEvaluatorTest.java index 36e5e7829c60..d957020738a4 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/ArrayEvaluatorTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/ArrayEvaluatorTest.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.lucene.util.LuceneTestCase; +import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.eval.ArrayEvaluator; import org.apache.solr.client.solrj.io.eval.StreamEvaluator; @@ -47,7 +48,7 @@ public ArrayEvaluatorTest() { @Test public void arrayLongSortAscTest() throws IOException{ StreamEvaluator evaluator = factory.constructEvaluator("array(a,b,c, sort=asc)"); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); evaluator.setStreamContext(context); Object result; @@ -68,7 +69,7 @@ public void arrayLongSortAscTest() throws IOException{ @Test public void arrayLongSortDescTest() throws IOException{ StreamEvaluator evaluator = factory.constructEvaluator("array(a,b,c, sort=desc)"); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); evaluator.setStreamContext(context); Object result; @@ -89,7 +90,7 @@ public void arrayLongSortDescTest() throws IOException{ @Test public void arrayStringSortAscTest() throws IOException{ StreamEvaluator evaluator = factory.constructEvaluator("array(a,b,c, sort=asc)"); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); evaluator.setStreamContext(context); Object result; @@ -110,7 +111,7 @@ public void arrayStringSortAscTest() throws IOException{ @Test public void arrayStringSortDescTest() throws IOException{ StreamEvaluator evaluator = factory.constructEvaluator("array(a,b,c, sort=desc)"); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); evaluator.setStreamContext(context); Object result; @@ -131,7 +132,7 @@ public void arrayStringSortDescTest() throws IOException{ @Test public void arrayStringUnsortedTest() throws IOException{ StreamEvaluator evaluator = factory.constructEvaluator("array(a,b,c)"); - StreamContext context = new StreamContext(); + StreamContext context = new StreamContext(new SolrClientCache()); evaluator.setStreamContext(context); Object result; diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/ConversionEvaluatorsTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/ConversionEvaluatorsTest.java index 2194b8f2a948..fd2d24b5b26d 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/ConversionEvaluatorsTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/ConversionEvaluatorsTest.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.commons.collections.map.HashedMap; +import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.eval.ConversionEvaluator; import org.apache.solr.client.solrj.io.eval.RawValueEvaluator; @@ -58,7 +59,7 @@ public void testInvalidExpression() throws Exception { try { evaluator = factory.constructEvaluator("convert(inches)"); - StreamContext streamContext = new StreamContext(); + StreamContext streamContext = new StreamContext(new SolrClientCache()); evaluator.setStreamContext(streamContext); assertTrue(false); } catch (IOException e) { @@ -67,7 +68,7 @@ public void testInvalidExpression() throws Exception { try { evaluator = factory.constructEvaluator("convert(inches, yards, 3)"); - StreamContext streamContext = new StreamContext(); + StreamContext streamContext = new StreamContext(new SolrClientCache()); evaluator.setStreamContext(streamContext); Tuple tuple = new Tuple(new HashMap()); evaluator.evaluate(tuple); @@ -118,7 +119,7 @@ public void testKiloMeters() throws Exception { public void testFunction(String expression, Number expected) throws Exception { StreamEvaluator evaluator = factory.constructEvaluator(expression); - StreamContext streamContext = new StreamContext(); + StreamContext streamContext = new StreamContext(new SolrClientCache()); evaluator.setStreamContext(streamContext); Object result = evaluator.evaluate(new Tuple(values)); assertTrue(result instanceof Number); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/TemporalEvaluatorsTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/TemporalEvaluatorsTest.java index fbf99ab0d8ef..5a235289aa4f 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/TemporalEvaluatorsTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/TemporalEvaluatorsTest.java @@ -31,6 +31,7 @@ import java.util.TimeZone; import org.apache.commons.collections.map.HashedMap; +import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.eval.StreamEvaluator; import org.apache.solr.client.solrj.io.eval.TemporalEvaluatorDay; @@ -90,7 +91,7 @@ public void testInvalidExpression() throws Exception { try { evaluator = factory.constructEvaluator("week()"); - StreamContext streamContext = new StreamContext(); + StreamContext streamContext = new StreamContext(new SolrClientCache()); evaluator.setStreamContext(streamContext); assertTrue(false); } catch (IOException e) { @@ -99,7 +100,7 @@ public void testInvalidExpression() throws Exception { try { evaluator = factory.constructEvaluator("week(a, b)"); - StreamContext streamContext = new StreamContext(); + StreamContext streamContext = new StreamContext(new SolrClientCache()); evaluator.setStreamContext(streamContext); assertTrue(false); } catch (IOException e) { @@ -108,7 +109,7 @@ public void testInvalidExpression() throws Exception { try { evaluator = factory.constructEvaluator("Week()"); - StreamContext streamContext = new StreamContext(); + StreamContext streamContext = new StreamContext(new SolrClientCache()); evaluator.setStreamContext(streamContext); assertTrue(false); } catch (IOException e) { @@ -125,7 +126,7 @@ public void testInvalidValues() throws Exception { try { values.clear(); values.put("a", 12); - StreamContext streamContext = new StreamContext(); + StreamContext streamContext = new StreamContext(new SolrClientCache()); evaluator.setStreamContext(streamContext); Object result = evaluator.evaluate(new Tuple(values)); assertTrue(false); @@ -136,7 +137,7 @@ public void testInvalidValues() throws Exception { try { values.clear(); values.put("a", "1995-12-31"); - StreamContext streamContext = new StreamContext(); + StreamContext streamContext = new StreamContext(new SolrClientCache()); evaluator.setStreamContext(streamContext); Object result = evaluator.evaluate(new Tuple(values)); assertTrue(false); @@ -147,7 +148,7 @@ public void testInvalidValues() throws Exception { try { values.clear(); values.put("a", ""); - StreamContext streamContext = new StreamContext(); + StreamContext streamContext = new StreamContext(new SolrClientCache()); evaluator.setStreamContext(streamContext); Object result = evaluator.evaluate(new Tuple(values)); assertTrue(false); @@ -278,7 +279,7 @@ public void testLimitedFunctions() throws Exception { public void testFunction(String expression, Object value, Number expected) throws Exception { StreamEvaluator evaluator = factory.constructEvaluator(expression); - StreamContext streamContext = new StreamContext(); + StreamContext streamContext = new StreamContext(new SolrClientCache()); evaluator.setStreamContext(streamContext); values.clear(); values.put("a", value);