diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryUsingFunctionContextDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryUsingFunctionContextDUnitTest.java index 076a9bb51c1e..821deaf4bb23 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryUsingFunctionContextDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryUsingFunctionContextDUnitTest.java @@ -19,12 +19,16 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Properties; import java.util.Set; +import org.apache.geode.cache.query.internal.QueryObserver; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -165,7 +169,6 @@ public final void preTearDownCacheTestCase() throws Exception { @Override public final void postTearDownCacheTestCase() throws Exception { - Invoke.invokeInEveryVM(() -> QueryObserverHolder.reset()); cache = null; Invoke.invokeInEveryVM(new SerializableRunnable() { @Override @@ -205,8 +208,7 @@ public void run2() throws CacheException { for (int i = 0; i < queriesForRR.length; i++) { try { - function = new TestQueryFunction("queryFunctionOnRR"); - + function = new TestQueryFunction("queryFunctionOnRR", QueryObserverForTestQueriesWithFilterKeysOnReplicatedRegion.class.getName()); rcollector = FunctionService.onRegion(CacheFactory.getAnyInstance().getRegion(repRegionName)) .setArguments(queriesForRR[i]).execute(function); @@ -240,7 +242,7 @@ public void run2() throws CacheException { Object[][] r = new Object[1][2]; TestServerQueryFunction func = new TestServerQueryFunction("LDS Server function-1"); - function = new TestQueryFunction("queryFunction-1"); + function = new TestQueryFunction("queryFunction-1", null); QueryUsingFunctionContextDUnitTest test = new QueryUsingFunctionContextDUnitTest(); ArrayList queryResults2 = test.runQueryOnClientUsingFunc(function, PartitionedRegionName1, filter, queries[i]); @@ -276,7 +278,7 @@ public void run2() throws CacheException { filter.add(0); String query = "select * from " + SEPARATOR + " " + repRegionName + " where ID>=0"; TestServerQueryFunction func = new TestServerQueryFunction("LDS Server function-1"); - function = new TestQueryFunction("queryFunction-1"); + function = new TestQueryFunction("queryFunction-1", null); QueryUsingFunctionContextDUnitTest test = new QueryUsingFunctionContextDUnitTest(); try { test.runQueryOnClientUsingFunc(function, repRegionName, filter, query); @@ -288,7 +290,7 @@ public void run2() throws CacheException { query = "select * from " + SEPARATOR + " " + PartitionedRegionName1 + " where ID>=0"; func = new TestServerQueryFunction("LDS Server function-1"); - function = new TestQueryFunction("queryFunction-1"); + function = new TestQueryFunction("queryFunction-1", null); test = new QueryUsingFunctionContextDUnitTest(); try { test.runQueryOnClientUsingFunc(function, PartitionedRegionName1, filter, query); @@ -312,7 +314,7 @@ public void run2() throws CacheException { Set filter = getFilter(0, 1); TestServerQueryFunction func = new TestServerQueryFunction("LDS Server function-1"); - function = new TestQueryFunction("queryFunction-2"); + function = new TestQueryFunction("queryFunction-2", null); for (int i = 0; i < queries.length; i++) { Object[][] r = new Object[1][2]; @@ -342,33 +344,6 @@ public void run2() throws CacheException { @Test public void testQueriesWithFilterKeysOnPRLocalAndRemoteWithBucketDestroy() { - // Set Query Observer in cache on server1 - server1.invoke(new CacheSerializableRunnable("Set QueryObserver in cache on server1") { - @Override - public void run2() throws CacheException { - - class MyQueryObserver extends IndexTrackingQueryObserver { - - @Override - public void startQuery(Query query) { - // Destroy only for first query. - if (query.getQueryString().contains("ID>=0")) { - Region pr = CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1); - Region KeyRegion = null; - for (int i = 3; i < 7; i++) { - KeyRegion = ((PartitionedRegion) pr).getBucketRegion(i/* key */); - if (KeyRegion != null) { - KeyRegion.destroyRegion(); - } - } - } - } - }; - - QueryObserverHolder.setInstance(new MyQueryObserver()); - } - }); - client.invoke(new CacheSerializableRunnable("Test query on client and server") { @Override @@ -376,7 +351,7 @@ public void run2() throws CacheException { Set filter = getFilter(0, 2); TestServerQueryFunction func = new TestServerQueryFunction("LDS Server function-2"); - function = new TestQueryFunction("queryFunction"); + function = new TestQueryFunction("queryFunction", null); for (int i = 0; i < queries.length; i++) { @@ -408,30 +383,6 @@ public void testQueriesWithFilterKeysOnPRWithBucketDestroy() { Object[][] r = new Object[queries.length][2]; Set filter = new HashSet(); - // Close cache on server1 - server1.invoke(new CacheSerializableRunnable("Set QueryObserver in cache on server1") { - @Override - public void run2() throws CacheException { - - class MyQueryObserver extends IndexTrackingQueryObserver { - - @Override - public void startQuery(Query query) { - Region pr = CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1); - Region KeyRegion = null; - for (int i = 0; i < 7; i++) { - KeyRegion = ((PartitionedRegion) pr).getBucketRegion(i/* key */); - if (KeyRegion != null) { - KeyRegion.destroyRegion(); - } - } - } - }; - - QueryObserverHolder.setInstance(new MyQueryObserver()); - } - }); - client.invoke(new CacheSerializableRunnable("Run function on PR") { @Override public void run2() throws CacheException { @@ -442,7 +393,7 @@ public void run2() throws CacheException { for (int i = 0; i < queries.length; i++) { try { - function = new TestQueryFunction("queryFunctionBucketDestroy"); + function = new TestQueryFunction("queryFunctionBucketDestroy", QueryObserverForTestQueriesWithFilterKeysOnPRWithBucketDestroy.class.getName()); rcollector = FunctionService .onRegion(CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1)) @@ -479,30 +430,6 @@ public void testQueriesWithFilterKeysOnPRWithRebalancing() { IgnoredException.addIgnoredException("FunctionException"); IgnoredException.addIgnoredException("IOException"); - // Close cache on server1 - server1.invoke(new CacheSerializableRunnable("Set QueryObserver in cache on server1") { - @Override - public void run2() throws CacheException { - - class MyQueryObserver extends IndexTrackingQueryObserver { - - @Override - public void startQuery(Query query) { - Region pr = CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1); - Region KeyRegion = null; - for (int i = 6; i < 9; i++) { - KeyRegion = ((PartitionedRegion) pr).getBucketRegion(i/* key */); - if (KeyRegion != null) { - KeyRegion.destroyRegion(); - } - } - } - }; - - QueryObserverHolder.setInstance(new MyQueryObserver()); - } - }); - client.invoke(new CacheSerializableRunnable("Run function on PR") { @Override public void run2() throws CacheException { @@ -513,7 +440,7 @@ public void run2() throws CacheException { for (int i = 0; i < queries.length; i++) { try { - function = new TestQueryFunction("queryFunction"); + function = new TestQueryFunction("queryFunction", QueryObserverForTestQueriesWithFilterKeysOnPRWithRebalancing.class.getName()); rcollector = FunctionService .onRegion(CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1)) @@ -561,7 +488,7 @@ public void run2() throws CacheException { filter.add(0); for (int i = 0; i < nonColocatedQueries.length; i++) { - function = new TestQueryFunction("queryFunction-1"); + function = new TestQueryFunction("queryFunction-1", null); QueryUsingFunctionContextDUnitTest test = new QueryUsingFunctionContextDUnitTest(); try { ArrayList queryResults2 = test.runQueryOnClientUsingFunc(function, @@ -586,7 +513,7 @@ public void testJoinQueryPRWithMultipleIndexes() { @Override public void run2() throws CacheException { Set filter = getFilter(0, 1); - function = new TestQueryFunction("queryFunction-2"); + function = new TestQueryFunction("queryFunction-2", null); Object[][] r = new Object[2][2]; QueryUsingFunctionContextDUnitTest test = new QueryUsingFunctionContextDUnitTest(); int j = 0; @@ -616,8 +543,51 @@ public void run2() throws CacheException { }); } - // Helper classes and function + + public static class QueryObserverForTestQueriesWithFilterKeysOnReplicatedRegion extends IndexTrackingQueryObserver { + @Override + public void startQuery(Query query) { + Region pr = CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1); + Region KeyRegion = null; + for (int i = 0; i < 7; i++) { + KeyRegion = ((PartitionedRegion) pr).getBucketRegion(i/* key */); + if (KeyRegion != null) { + KeyRegion.destroyRegion(); + } + } + } + } + + public static class QueryObserverForTestQueriesWithFilterKeysOnPRWithBucketDestroy extends IndexTrackingQueryObserver { + @Override + public void startQuery(Query query) { + Region pr = CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1); + Region KeyRegion = null; + for (int i = 0; i < 7; i++) { + KeyRegion = ((PartitionedRegion) pr).getBucketRegion(i/* key */); + if (KeyRegion != null) { + KeyRegion.destroyRegion(); + } + } + } + } + + public static class QueryObserverForTestQueriesWithFilterKeysOnPRWithRebalancing extends IndexTrackingQueryObserver { + @Override + public void startQuery(Query query) { + Region pr = CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1); + Region KeyRegion = null; + for (int i = 6; i < 9; i++) { + KeyRegion = ((PartitionedRegion) pr).getBucketRegion(i/* key */); + if (KeyRegion != null) { + KeyRegion.destroyRegion(); + } + } + } + } + + public static class TestQueryFunction extends FunctionAdapter { @Override @@ -632,24 +602,46 @@ public boolean isHA() { private final String id; + private final String observerClassName; - public TestQueryFunction(String id) { + + public TestQueryFunction(String id, String observerClassName) { super(); this.id = id; + this.observerClassName = observerClassName; + } + + private QueryObserver getObserverFromClassName(String observerClassName) { + try { + Class clazz = Class.forName(observerClassName); + Constructor constructor = clazz.getConstructor(); + return (QueryObserver) constructor.newInstance(); + } catch (Exception e) { + return null; + } } @Override public void execute(FunctionContext context) { Cache cache = CacheFactory.getAnyInstance(); QueryService queryService = cache.getQueryService(); - ArrayList allQueryResults = new ArrayList(); String qstr = (String) context.getArguments(); + QueryObserver observer = null; + QueryObserver oldObserver = null; try { + observer = getObserverFromClassName(observerClassName); + if (observer != null) { + oldObserver = QueryObserverHolder.setInstance(observer); + } Query query = queryService.newQuery(qstr); context.getResultSender().lastResult( - (ArrayList) ((SelectResults) query.execute((RegionFunctionContext) context)).asList()); + ((SelectResults) query.execute((RegionFunctionContext) context)).asList()); } catch (Exception e) { throw new FunctionException(e); + } finally { + if (observer != null) { + QueryObserverHolder.setInstance(oldObserver); + } } } @@ -657,6 +649,7 @@ public void execute(FunctionContext context) { public String getId() { return this.id; } + } public static class TestServerQueryFunction extends FunctionAdapter { @@ -741,7 +734,7 @@ public void fillValuesInRegions() { } private void registerFunctionOnServers() { - function = new TestQueryFunction("queryFunction"); + function = new TestQueryFunction("queryFunction", null); server1.invoke(PRClientServerTestBase.class, "registerFunction", new Object[] {function}); server2.invoke(PRClientServerTestBase.class, "registerFunction", new Object[] {function});