Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEODE-8293: fix activeCQCount has negative value after close/stop cq for PR #5620

Merged
merged 4 commits into from Nov 17, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -68,7 +68,7 @@ public void cmdExecute(final Message clientMessage, final ServerConnection serve
Region region = serverConnection.getCache().getRegion(regionFullPath);
if (region == null) {
logger.warn(
"Region was not found during GetClientPartitionAttributes request for region path : %s",
"Region was not found during GetClientPartitionAttributes request for region path : {}",
regionFullPath);
errMessage =
"Region was not found during GetClientPartitionAttributes request for region path : "
Expand Down
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.query.cq;

import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.Serializable;
import java.util.concurrent.TimeUnit;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import org.apache.geode.cache.Cache;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqEvent;
import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.CqServiceStatistics;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.data.Portfolio;
import org.apache.geode.management.DistributedSystemMXBean;
import org.apache.geode.management.ManagementService;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.junit.rules.GfshCommandRule;

public class CQMetricsDUnitTest {

private CqAttributes cqa;
private QueryService qs;
mkevo marked this conversation as resolved.
Show resolved Hide resolved
private TestCqListener testListener;
private MemberVM locator, server1, server2;

@Rule
public ClusterStartupRule cluster = new ClusterStartupRule(5);

@Rule
public GfshCommandRule gfsh = new GfshCommandRule();

@Before
public void setUpServers() throws Exception {
locator = cluster.startLocatorVM(0, l -> l.withoutClusterConfigurationService());
server1 = cluster.startServerVM(1, locator.getPort());
server2 = cluster.startServerVM(2, locator.getPort());

ClientCache clientCache = createClientCache(locator.getPort());
Region region =
clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");

qs = clientCache.getQueryService();
CqAttributesFactory cqaf = new CqAttributesFactory();
testListener = new TestCqListener();
cqaf.addCqListener(testListener);

cqa = cqaf.create();
gfsh.connectAndVerify(locator);
}

@Test
public void testStopCq() throws Exception {
gfsh.executeAndAssertThat("create region --name=region --type=PARTITION")
.statusIsSuccess();
qs.newCq("Select * from /region r where r.ID = 1", cqa).execute();

server1.invoke(() -> populateRegion(0, 100));

locator.invoke(() -> {
Cache cache = getCache();
ManagementService service = ManagementService.getManagementService(cache);
DistributedSystemMXBean dsmbean = service.getDistributedSystemMXBean();
await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(dsmbean.getActiveCQCount()).isEqualTo(2));
});

// stop cq
qs.stopCqs();

locator.invoke(() -> {
Cache cache = getCache();
ManagementService service = ManagementService.getManagementService(cache);
DistributedSystemMXBean dsmbean = service.getDistributedSystemMXBean();
await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(dsmbean.getActiveCQCount()).isEqualTo(0));
});

checkActiveCqCount(server1, 0);
checkActiveCqCount(server2, 0);
}

@Test
public void testCloseCq() throws Exception {
gfsh.executeAndAssertThat("create region --name=region --type=PARTITION")
.statusIsSuccess();
qs.newCq("Select * from /region r where r.ID = 1", cqa).execute();

server1.invoke(() -> populateRegion(0, 100));

locator.invoke(() -> {
Cache cache = getCache();
ManagementService service = ManagementService.getManagementService(cache);
DistributedSystemMXBean dsmbean = service.getDistributedSystemMXBean();
await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(dsmbean.getActiveCQCount()).isEqualTo(2));
});

// close cq
qs.closeCqs();

locator.invoke(() -> {
Cache cache = getCache();
ManagementService service = ManagementService.getManagementService(cache);
DistributedSystemMXBean dsmbean = service.getDistributedSystemMXBean();
await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(dsmbean.getActiveCQCount()).isEqualTo(0));
});
}

private class TestCqListener implements CqListener, Serializable {
public int onEventCalls = 0;

@Override
public void onEvent(CqEvent aCqEvent) {
onEventCalls++;
}

@Override
public void onError(CqEvent aCqEvent) {}

@Override
public void close() {}
}

private static void populateRegion(int startingId, int endingId) {
Region exampleRegion = getCache().getRegion("region");
for (int i = startingId; i < endingId; i++) {
exampleRegion.put("" + i, new Portfolio(i));
}
}

private ClientCache createClientCache(Integer locator1Port) {
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolLocator("localhost", locator1Port);
ccf.setPoolSubscriptionEnabled(true);
return ccf.create();
}

private void checkActiveCqCount(MemberVM vm, int expectedResult) {
vm.invoke(() -> {
QueryService queryService = getCache().getQueryService();
CqServiceStatistics cqServiceStats = queryService.getCqStatistics();
await()
kohlmu-pivotal marked this conversation as resolved.
Show resolved Hide resolved
.atMost(30, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(cqServiceStats.numCqsActive()).isEqualTo(expectedResult));
});
}
}
Expand Up @@ -65,7 +65,7 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve

private ClientProxyMembershipID clientProxyId = null;

private CacheClientNotifier ccn = null;
private CacheClientNotifier cacheClientNotifier = null;

private String serverCqName;

Expand Down Expand Up @@ -103,15 +103,16 @@ public String getServerCqName() {
}

@Override
public void registerCq(ClientProxyMembershipID p_clientProxyId, CacheClientNotifier p_ccn,
public void registerCq(ClientProxyMembershipID p_clientProxyId,
CacheClientNotifier p_cacheClientNotifier,
int p_cqState) throws CqException, RegionNotFoundException {

CacheClientProxy clientProxy = null;
this.clientProxyId = p_clientProxyId;

if (p_ccn != null) {
this.ccn = p_ccn;
clientProxy = p_ccn.getClientProxy(p_clientProxyId, true);
if (p_cacheClientNotifier != null) {
this.cacheClientNotifier = p_cacheClientNotifier;
clientProxy = p_cacheClientNotifier.getClientProxy(p_clientProxyId, true);
}

validateCq();
Expand Down Expand Up @@ -203,7 +204,7 @@ public void registerCq(ClientProxyMembershipID p_clientProxyId, CacheClientNotif
this.updateCqCreateStats();

// Initialize the state of CQ.
if (this.cqState.getState() != p_cqState) {
if (this.cqState.getState() != p_cqState || cacheClientNotifier == null) {
setCqState(p_cqState);
}

Expand Down Expand Up @@ -231,7 +232,7 @@ public void registerCq(ClientProxyMembershipID p_clientProxyId, CacheClientNotif
}
}

if (p_ccn != null) {
if (p_cacheClientNotifier != null) {
try {
cqService.addToCqMap(this);
} catch (CqExistsException cqe) {
Expand Down Expand Up @@ -407,7 +408,7 @@ public ClientProxyMembershipID getClientProxyId() {
*
*/
public CacheClientNotifier getCacheClientNotifier() {
return this.ccn;
return this.cacheClientNotifier;
}

/**
Expand All @@ -419,7 +420,7 @@ protected void cleanup() {
try {
if (this.cqBaseRegion != null && !this.cqBaseRegion.isDestroyed()) {
this.cqBaseRegion.getFilterProfile().closeCq(this);
CacheClientProxy clientProxy = ccn.getClientProxy(clientProxyId);
CacheClientProxy clientProxy = cacheClientNotifier.getClientProxy(clientProxyId);
clientProxy.decCqCount();
if (clientProxy.hasNoCq()) {
cqService.stats().decClientsWithCqs();
Expand Down
6 changes: 3 additions & 3 deletions geode-docs/reference/statistics_list.html.md.erb
Expand Up @@ -558,9 +558,9 @@ These statistics are for continuous querying information. The statistics are:
| Statistic | Description |
|----------------------------------|------------------------------------------------------------------------------------------------------------------------|
| `CQS_CREATED` | Number of CQ operations created. |
| `CQS_ACTIVE` | Number of CQ operations actively executing. |
| `CQS_STOPPED` | Number of CQ operations stopped. |
| `CQS_CLOSED` | Number of CQ operations closed. |
| `CQS_ACTIVE` | Number of CQ operations actively executing. The quantity reported for partitioned regions may be larger than that of replicated regions, as each redundant copy contributes the the count. |
| `CQS_STOPPED` | Number of CQ operations stopped. The quantity reported for partitioned regions may be larger than that of replicated regions, as each redundant copy contributes the the count. |
| `CQS_CLOSED` | Number of CQ operations closed. The quantity reported for partitioned regions may be larger than that of replicated regions, as each redundant copy contributes the the count. |
| `CQS_ON_CLIENT` | Number of CQ operations on the client. |
| `CLIENTS_WITH_CQS` | Number of Clients with CQ operations. |
| `CQ_QUERY_EXECUTION_TIME` | Time taken, in nanoseconds, for CQ query execution. |
Expand Down