diff --git a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java index 355201857834..cd86460673d3 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java +++ b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java @@ -7716,6 +7716,11 @@ public class LocalizedStrings { public static final StringId LuceneIndexCreation_INDEX_CANNOT_BE_CREATED_DUE_TO_PROFILE_VIOLATION = new StringId(6667, "Lucene index {0} cannot be created because its parameters are incompatible with another Lucene index"); + + public static final StringId cq_CACHE_CLIENT_PROXY_IS_NULL = + new StringId(6668, + "No Cache Client Proxy found while executing CQ."); + /** Testing strings, messageId 90000-99999 **/ /** diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java index c96a6e6c7291..6aee8f5f734a 100644 --- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java +++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java @@ -284,8 +284,10 @@ public synchronized ServerCQ executeCq(String cqName, String queryString, int cq if (emptyRegionsMap != null && emptyRegionsMap.containsKey(cQuery.getBaseRegionName())) { regionDataPolicy = 0; } + + CacheClientProxy proxy = getCacheClientProxy(clientProxyId, ccn); ccn.updateMapOfEmptyRegions( - ccn.getClientProxy(clientProxyId, true).getRegionsWithEmptyDataPolicy(), + proxy.getRegionsWithEmptyDataPolicy(), cQuery.getBaseRegionName(), regionDataPolicy); } } catch (CqException cqe) { @@ -307,6 +309,15 @@ public synchronized ServerCQ executeCq(String cqName, String queryString, int cq return cQuery; } + protected CacheClientProxy getCacheClientProxy(ClientProxyMembershipID clientProxyId, + CacheClientNotifier ccn) throws CqException { + CacheClientProxy proxy = ccn.getClientProxy(clientProxyId, true); + if (proxy == null) { + throw new CqException(LocalizedStrings.cq_CACHE_CLIENT_PROXY_IS_NULL.toLocalizedString()); + } + return proxy; + } + @Override public void resumeCQ(int cqState, ServerCQ cQuery) { // Initialize the state of CQ. diff --git a/geode-cq/src/test/java/org/apache/geode/cache/query/internal/cq/CqServiceImplJUnitTest.java b/geode-cq/src/test/java/org/apache/geode/cache/query/internal/cq/CqServiceImplJUnitTest.java new file mode 100644 index 000000000000..d832c42084fd --- /dev/null +++ b/geode-cq/src/test/java/org/apache/geode/cache/query/internal/cq/CqServiceImplJUnitTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.query.internal.cq; + +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.CancelCriterion; +import org.apache.geode.cache.query.CqException; +import org.apache.geode.distributed.DistributedSystem; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier; +import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; +import org.apache.geode.internal.i18n.LocalizedStrings; +import org.apache.geode.test.junit.categories.UnitTest; + +@Category(UnitTest.class) +public class CqServiceImplJUnitTest { + + @Test + public void closedCacheClientProxyInExecuteCqShouldThrowCQException() { + InternalCache cache = mock(InternalCache.class); + CancelCriterion cancelCriterion = mock(CancelCriterion.class); + DistributedSystem distributedSystem = mock(DistributedSystem.class); + doNothing().when(cancelCriterion).checkCancelInProgress(null); + when(cache.getCancelCriterion()).thenReturn(cancelCriterion); + when(cache.getDistributedSystem()).thenReturn(distributedSystem); + + + ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class); + CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class); + when(cacheClientNotifier.getClientProxy(clientProxyMembershipID, true)).thenReturn(null); + + CqServiceImpl cqService = new CqServiceImpl(cache); + try { + cqService.getCacheClientProxy(clientProxyMembershipID, cacheClientNotifier); + fail(); + } catch (Exception ex) { + if (!(ex instanceof CqException && ex.getMessage() + .contains(LocalizedStrings.cq_CACHE_CLIENT_PROXY_IS_NULL.toLocalizedString()))) { + fail(); + } + } + + } +}