Skip to content
This repository has been archived by the owner on Mar 31, 2023. It is now read-only.

Commit

Permalink
GEODE-8457: Fix crash when IO_error and single-hop=false (apache#642)
Browse files Browse the repository at this point in the history
- Added 2 test cases and style changes after review
  • Loading branch information
albertogpz authored and jvarenina committed Oct 28, 2020
1 parent 5f30f8b commit d0d4f9e
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 42 deletions.
89 changes: 61 additions & 28 deletions cppcache/integration/test/PartitionRegionOpsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@ Cache createCache() {
return cache;
}

std::shared_ptr<Pool> createPool(Cluster& cluster, Cache& cache) {
std::shared_ptr<Pool> createPool(Cluster& cluster, Cache& cache,
bool singleHop) {
auto poolFactory = cache.getPoolManager().createFactory();
cluster.applyLocators(poolFactory);
poolFactory.setPRSingleHopEnabled(true);
poolFactory.setPRSingleHopEnabled(singleHop);
return poolFactory.create("default");
}

Expand Down Expand Up @@ -89,14 +90,7 @@ void getEntries(std::shared_ptr<Region> region, int numEntries) {
}
}

/**
* In this test case we verify that in a partition region with redundancy
* when one server goes down, all gets are still served.
* It can be observed in the logs that when one of the server goes down
* the bucketServerLocations for that server are removed from the
* client metadata.
*/
TEST(PartitionRegionOpsTest, getPartitionedRegionWithRedundancyServerGoesDown) {
void putPartitionedRegionWithRedundancyServerGoesDown(bool singleHop) {
Cluster cluster{LocatorCount{1}, ServerCount{2}};
cluster.start();
cluster.getGfsh()
Expand All @@ -108,34 +102,23 @@ TEST(PartitionRegionOpsTest, getPartitionedRegionWithRedundancyServerGoesDown) {
.execute();

auto cache = createCache();
auto pool = createPool(cluster, cache);
auto pool = createPool(cluster, cache, singleHop);
auto region = setupRegion(cache, pool);

int ENTRIES = 30;

putEntries(region, ENTRIES, 0);

getEntries(region, ENTRIES);

cluster.getServers()[1].stop();

getEntries(region, ENTRIES);
putEntries(region, ENTRIES, 1);

cluster.getServers()[1].start();

getEntries(region, ENTRIES);
putEntries(region, ENTRIES, 2);
}

/**
* In this test case we verify that in a partition region with redundancy
* when one server goes down, all puts are still served.
* It can be observed in the logs that when one of the server goes down
* the bucketServerLocations for that server are removed from the
* client metadata.
* When the server is brought back again, the meta data is refreshed
* after putting again values.
*/
TEST(PartitionRegionOpsTest, putPartitionedRegionWithRedundancyServerGoesDown) {
void getPartitionedRegionWithRedundancyServerGoesDown(bool singleHop) {
Cluster cluster{LocatorCount{1}, ServerCount{2}};
cluster.start();
cluster.getGfsh()
Expand All @@ -147,20 +130,70 @@ TEST(PartitionRegionOpsTest, putPartitionedRegionWithRedundancyServerGoesDown) {
.execute();

auto cache = createCache();
auto pool = createPool(cluster, cache);
auto pool = createPool(cluster, cache, singleHop);
auto region = setupRegion(cache, pool);

int ENTRIES = 30;

putEntries(region, ENTRIES, 0);

getEntries(region, ENTRIES);

cluster.getServers()[1].stop();

putEntries(region, ENTRIES, 1);
getEntries(region, ENTRIES);

cluster.getServers()[1].start();

putEntries(region, ENTRIES, 2);
getEntries(region, ENTRIES);
}

/**
* In this test case we verify that in a partition region with redundancy
* when one server goes down, all gets are still served.
* Single-hop is enabled in the client.
* It can be observed in the logs that when one of the server goes down
* the bucketServerLocations for that server are removed from the
* client metadata.
*/
TEST(PartitionRegionOpsTest,
getPartitionedRegionWithRedundancyServerGoesDownSingleHop) {
getPartitionedRegionWithRedundancyServerGoesDown(true);
}

/**
* In this test case we verify that in a partition region with redundancy
* when one server goes down, all puts are still served.
* Single-hop is enabled in the client.
* It can be observed in the logs that when one of the server goes down
* the bucketServerLocations for that server are removed from the
* client metadata.
* When the server is brought back again, the meta data is refreshed
* after putting again values.
*/
TEST(PartitionRegionOpsTest,
putPartitionedRegionWithRedundancyServerGoesDownSingleHop) {
putPartitionedRegionWithRedundancyServerGoesDown(true);
}

/**
* In this test case we verify that in a partition region with redundancy
* when one server goes down, all gets are still served.
* Single hop is not enabled in the client.
*/
TEST(PartitionRegionOpsTest,
getPartitionedRegionWithRedundancyServerGoesDownNoSingleHop) {
getPartitionedRegionWithRedundancyServerGoesDown(false);
}

/**
* In this test case we verify that in a partition region with redundancy
* when one server goes down, all puts are still served.
* Single-hop is not enabled in the client.
*/
TEST(PartitionRegionOpsTest,
putPartitionedRegionWithRedundancyServerGoesDownNoSingleHop) {
putPartitionedRegionWithRedundancyServerGoesDown(false);
}

} // namespace
32 changes: 18 additions & 14 deletions cppcache/src/ThinClientPoolDM.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ void ThinClientPoolDM::startBackgroundThreads() {
// starting chunk processing helper thread
ThinClientBaseDM::init();

if (m_clientMetadataService != nullptr) {
if (m_clientMetadataService) {
m_clientMetadataService->start();
}
}
Expand Down Expand Up @@ -1093,7 +1093,7 @@ TcrEndpoint* ThinClientPoolDM::getSingleHopServer(
std::shared_ptr<BucketServerLocation>& serverlocation,
std::set<ServerLocation>& excludeServers) {
const std::shared_ptr<CacheableKey>& key = request.getKeyRef();
if (m_clientMetadataService == nullptr || key == nullptr) return nullptr;
if (!m_clientMetadataService || key == nullptr) return nullptr;
auto r = request.getRegion();
auto region = nullptr == r ? nullptr : r->shared_from_this();
TcrEndpoint* ep = nullptr;
Expand Down Expand Up @@ -1189,7 +1189,7 @@ GfErrType ThinClientPoolDM::sendSyncRequest(TcrMessage& request,
if (!request.forTransaction() && m_attrs->getPRSingleHopEnabled() &&
(type == TcrMessage::GET_ALL_70 ||
type == TcrMessage::GET_ALL_WITH_CALLBACK) &&
m_clientMetadataService != nullptr) {
m_clientMetadataService) {
GfErrType error = GF_NOERR;

auto region =
Expand Down Expand Up @@ -1429,10 +1429,12 @@ GfErrType ThinClientPoolDM::sendSyncRequest(
}
excludeServers.insert(ServerLocation(ep->name()));
if (error == GF_IOERR) {
auto sl = std::make_shared<BucketServerLocation>(ep->name());
LOGINFO("Removing bucketServerLocation %s due to GF_IOERR",
sl->toString().c_str());
m_clientMetadataService->removeBucketServerLocation(sl);
if (m_clientMetadataService) {
auto sl = std::make_shared<BucketServerLocation>(ep->name());
LOGINFO("Removing bucketServerLocation %s due to GF_IOERR",
sl->toString().c_str());
m_clientMetadataService->removeBucketServerLocation(sl);
}
}
}
} else {
Expand Down Expand Up @@ -1466,7 +1468,7 @@ GfErrType ThinClientPoolDM::sendSyncRequest(
"reply Metadata version is %d & bsl version is %d "
"reply.isFEAnotherHop()=%d",
reply.getMetaDataVersion(), version, reply.isFEAnotherHop());
if (m_clientMetadataService != nullptr && request.forSingleHop() &&
if (m_clientMetadataService && request.forSingleHop() &&
(reply.getMetaDataVersion() != 0 ||
(request.getMessageType() == TcrMessage::EXECUTE_REGION_FUNCTION &&
request.getKeyRef() != nullptr && reply.isFEAnotherHop()))) {
Expand Down Expand Up @@ -2317,7 +2319,7 @@ TcrConnection* ThinClientPoolDM::getConnectionFromQueueW(
// if all buckets are not initialized
// match = true;
}
if (slTmp != nullptr && m_clientMetadataService != nullptr) {
if (slTmp != nullptr && m_clientMetadataService) {
if (m_clientMetadataService->isBucketMarkedForTimeout(
request.getRegionName().c_str(), slTmp->getBucketId())) {
*error = GF_CLIENT_WAIT_TIMEOUT;
Expand All @@ -2334,7 +2336,7 @@ TcrConnection* ThinClientPoolDM::getConnectionFromQueueW(
*error = createPoolConnectionToAEndPoint(conn, theEP, maxConnLimit, true);
if (*error == GF_CLIENT_WAIT_TIMEOUT ||
*error == GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA) {
if (m_clientMetadataService == nullptr || request.getKey() == nullptr) {
if (!m_clientMetadataService || request.getKey() == nullptr) {
return nullptr;
}

Expand All @@ -2350,10 +2352,12 @@ TcrConnection* ThinClientPoolDM::getConnectionFromQueueW(
}
return nullptr;
} else if (*error == GF_IOERR) {
auto sl = std::make_shared<BucketServerLocation>(theEP->name());
LOGINFO("Removing bucketServerLocation %s due to GF_IOERR",
sl->toString().c_str());
m_clientMetadataService->removeBucketServerLocation(sl);
if (m_clientMetadataService) {
auto sl = std::make_shared<BucketServerLocation>(theEP->name());
LOGINFO("Removing bucketServerLocation %s due to GF_IOERR",
sl->toString().c_str());
m_clientMetadataService->removeBucketServerLocation(sl);
}
}
}
}
Expand Down

0 comments on commit d0d4f9e

Please sign in to comment.