From 6ed78b4f5e44c1f3b4847dfdaa5c10a8df886ae9 Mon Sep 17 00:00:00 2001 From: Ken Howe Date: Thu, 6 Oct 2016 15:02:24 -0700 Subject: [PATCH 1/3] GEODE-538: Add check for persistent data recovery PartitionedRegion.getNodeForBucketReadOrLoad can return an invalid node if persistent data recovery is in process and a get() targets a bucket that hasn't been recoverd yet. This can result in returning an incorrect value (null) or throwing ConflictingPersistentDataException from a get() or put() on the region. This change adds a check for persistent recovery to be completed before creating the new bucket. If recovery isn't complete then the operation on the region will fail with a PartitionOfflineException. Queries on a region while persistent recovery is in progress can also result in incorrect results so a similar check is added to DefaultQuery.checkQueryOnPR. --- .../cache/query/internal/DefaultQuery.java | 10 + .../cache/PRHARedundancyProvider.java | 10 + .../internal/cache/PartitionedRegion.java | 5 +- .../geode/internal/i18n/LocalizedStrings.java | 1 + .../partitioned/PRBasicQueryDUnitTest.java | 221 ++++++++++ .../query/partitioned/PRQueryDUnitHelper.java | 185 ++++++++ ...ntColocatedPartitionedRegionDUnitTest.java | 411 +++++++++++++++++- 7 files changed, 827 insertions(+), 16 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java index 58df3904e32b..7e445ec350e8 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java @@ -27,6 +27,8 @@ import org.apache.geode.cache.execute.Function; import org.apache.geode.cache.execute.RegionFunctionContext; import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.cache.persistence.PartitionOfflineException; +import org.apache.geode.cache.persistence.PersistentID; import org.apache.geode.cache.query.*; import org.apache.geode.cache.query.internal.cq.InternalCqQuery; import org.apache.geode.distributed.internal.DistributionConfig; @@ -601,6 +603,14 @@ private QueryExecutor checkQueryOnPR(Object[] parameters) throws RegionNotFoundE throw new RegionNotFoundException(LocalizedStrings.DefaultQuery_REGION_NOT_FOUND_0.toLocalizedString(regionPath)); } if (rgn instanceof QueryExecutor) { + + if (((PartitionedRegion)rgn).getDataPolicy().withPersistence() && !((PartitionedRegion)rgn).isRecoveredFromDisk()) { + ((PartitionedRegion)rgn).getDistributionAdvisor(); + Set persistIds = new HashSet(((PartitionedRegion)rgn).getRegionAdvisor().advisePersistentMembers().values()); + persistIds.removeAll(((PartitionedRegion)rgn).getRegionAdvisor().adviseInitializedPersistentMembers().values()); + throw new PartitionOfflineException(persistIds, LocalizedStrings.PRHARedundancyProvider_PARTITIONED_REGION_0_OFFLINE_HAS_UNRECOVERED_PERSISTENT_DATA_1 + .toLocalizedString(new Object[] { ((PartitionedRegion)rgn).getFullPath(), persistIds})); + } prs.add((QueryExecutor)rgn); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java index cfedb677bceb..4d946920c0ce 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java @@ -24,6 +24,7 @@ import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.persistence.PartitionOfflineException; +import org.apache.geode.cache.persistence.PersistentID; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.DM; import org.apache.geode.distributed.internal.DistributionConfig; @@ -505,6 +506,15 @@ public InternalDistributedMember createBucketOnDataStore(int bucketId, PartitionedRegionException { final boolean isDebugEnabled = logger.isDebugEnabled(); + + if (!prRegion.isRecoveredFromDisk()) { + prRegion.getDistributionAdvisor(); + Set persistIds = new HashSet(prRegion.getRegionAdvisor().advisePersistentMembers().values()); + persistIds.removeAll(prRegion.getRegionAdvisor().adviseInitializedPersistentMembers().values()); + throw new PartitionOfflineException(persistIds, LocalizedStrings.PRHARedundancyProvider_PARTITIONED_REGION_0_OFFLINE_HAS_UNRECOVERED_PERSISTENT_DATA_1 + .toLocalizedString(new Object[] { prRegion.getFullPath(), persistIds})); + } + // If there are insufficient stores throw *before* we try acquiring the // (very expensive) bucket lock or the (somewhat expensive) monitor on this diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index baab79f930ab..a57796d3d5c5 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -1397,6 +1397,10 @@ void setRecoveredFromDisk() { new UpdateAttributesProcessor(this).distribute(false); } + public boolean isRecoveredFromDisk() { + return recoveredFromDisk; + } + public final void updatePRConfig(PartitionRegionConfig prConfig, boolean putOnlyIfUpdated) { final Set nodes = prConfig.getNodes(); @@ -3057,7 +3061,6 @@ public InternalDistributedMember getNodeForBucketWrite(int bucketId, final RetryTimeKeeper snoozer) { final boolean isDebugEnabled = logger.isDebugEnabled(); -// InternalDistributedSystem ids = (InternalDistributedSystem)this.cache.getDistributedSystem(); RetryTimeKeeper localSnoozer = snoozer; // Prevent early access to buckets that are not completely created/formed // and diff --git a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java index 8bfdd686eb50..7d762b813fd5 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java +++ b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java @@ -702,6 +702,7 @@ public class LocalizedStrings { public static final StringId AbstractDistributionConfig_CLIENT_CONFLATION_PROP_NAME = new StringId(1839, "Client override for server queue conflation setting"); public static final StringId PRHARRedundancyProvider_ALLOCATE_ENOUGH_MEMBERS_TO_HOST_BUCKET = new StringId(1840, "allocate enough members to host bucket."); public static final StringId PRHARedundancyProvider_TIME_OUT_WAITING_0_MS_FOR_CREATION_OF_BUCKET_FOR_PARTITIONED_REGION_1_MEMBERS_REQUESTED_TO_CREATE_THE_BUCKET_ARE_2 = new StringId(1841, "Time out waiting {0} ms for creation of bucket for partitioned region {1}. Members requested to create the bucket are: {2}"); + public static final StringId PRHARedundancyProvider_PARTITIONED_REGION_0_OFFLINE_HAS_UNRECOVERED_PERSISTENT_DATA_1 = new StringId(1842, "Partitioned Region {0} is offline due to unrecovered persistent data, {1}"); public static final StringId PUT_0_FAILED_TO_PUT_ENTRY_FOR_REGION_1_KEY_2_VALUE_3 = new StringId(1843, "{0}: Failed to put entry for region {1} key {2} value {3}"); public static final StringId PUT_0_UNEXPECTED_EXCEPTION = new StringId(1844, "{0}: Unexpected Exception"); diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRBasicQueryDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRBasicQueryDUnitTest.java index 8ef907ac8519..224a7e02f754 100755 --- a/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRBasicQueryDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRBasicQueryDUnitTest.java @@ -29,6 +29,7 @@ import org.apache.geode.cache.Cache; import org.apache.geode.cache.Region; +import org.apache.geode.cache.persistence.PartitionOfflineException; import org.apache.geode.cache.query.Index; import org.apache.geode.cache.query.IndexType; import org.apache.geode.cache.query.Query; @@ -38,6 +39,7 @@ import org.apache.geode.cache.query.data.PortfolioData; import org.apache.geode.cache30.CacheSerializableRunnable; import org.apache.geode.internal.cache.PartitionedRegionDUnitTestCase; +import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.LogWriterUtils; import org.apache.geode.test.dunit.VM; @@ -67,6 +69,8 @@ public void setCacheInVMs(VM... vms) { } } + private final static int MAX_SYNC_WAIT = 30 * 1000; + PRQueryDUnitHelper PRQHelp = new PRQueryDUnitHelper(); final String name = "Portfolios"; @@ -153,6 +157,223 @@ public void testPRBasicQuerying() throws Exception "PRQBasicQueryDUnitTest#testPRBasicQuerying: Querying PR's Test ENDED"); } + /** + * A basic dunit test that
+ * 1. Creates a PR and colocated child region Accessor and Data Store with redundantCopies = 0. + * 2. Populates the region with test data. + * 3. Fires a query on accessor VM and verifies the result. + * 4. Shuts down the caches, then restarts them asynchronously + * 5. Attempt the query while the regions are being recovered + * @throws Exception + */ + @Test + public void testColocatedPRQueryDuringRecovery() throws Exception + { + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + setCacheInVMs(vm0, vm1); + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR Test with DACK Started"); + + // Creting PR's on the participating VM's + // Creating Accessor node on the VM0. + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Creating the Accessor node in the PR"); + + vm0.invoke(PRQHelp.getCacheSerializableRunnableForColocatedPRCreate(name, + redundancy, PortfolioData.class, true)); + // Creating local region on vm0 to compare the results of query. + vm0.invoke(PRQHelp.getCacheSerializableRunnableForLocalRegionCreation(localName, PortfolioData.class)); + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully created the Accessor node in the PR"); + + // Creating the Datastores Nodes in the VM1. + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest:testColocatedPRBasicQuerying ----- Creating the Datastore node in the PR"); + vm1.invoke(PRQHelp.getCacheSerializableRunnableForColocatedPRCreate(name, + redundancy, PortfolioData.class, true)); + + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully Created the Datastore node in the PR"); + + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully Created PR's across all VM's"); + + // Generating portfolio object array to be populated across the PR's & Local + // Regions + + final PortfolioData[] portfolio = createPortfolioData(cnt, cntDest); + // Putting the data into the PR's created + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRPuts(name, portfolio, + cnt, cntDest)); + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRDuplicatePuts(name, portfolio, + cnt, cntDest)); + + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Inserted Portfolio data across PR's"); + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRPuts(localName, + portfolio, cnt, cntDest)); + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRDuplicatePuts(localName, + portfolio, cnt, cntDest)); + + // querying the VM for data and comparing the result with query result of + // local region. + // querying the VM for data + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRQueryAndCompareResults( + name, localName)); + + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR's 1st pass ENDED"); + + // Shut everything down and then restart to test queries during recovery + vm0.invoke(PRQHelp.getCacheSerializableRunnableForCloseCache()); + vm1.invoke(PRQHelp.getCacheSerializableRunnableForCloseCache()); + + // Re-create the regions - only create the parent regions on the datastores + setCacheInVMs(vm0, vm1); + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Creating the Accessor node in the PR"); + vm0.invoke(PRQHelp.getCacheSerializableRunnableForColocatedParentCreate(name, + redundancy, PortfolioData.class, true)); + + // Creating local region on vm0 to compare the results of query. + vm0.invoke(PRQHelp.getCacheSerializableRunnableForLocalRegionCreation(localName, PortfolioData.class)); + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully created the Accessor node in the PR"); + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest:testColocatedPRBasicQuerying: re-creating the Datastore node in the PR"); + vm1.invoke(PRQHelp.getCacheSerializableRunnableForColocatedParentCreate(name, + redundancy, PortfolioData.class, true)); + + // Now start the child regions asynchronously so queries will happen during persistent recovery + AsyncInvocation vm0PR = vm0.invokeAsync(PRQHelp.getCacheSerializableRunnableForColocatedChildCreate(name, + redundancy, PortfolioData.class, true)); + AsyncInvocation vm1PR = vm1.invokeAsync(PRQHelp.getCacheSerializableRunnableForColocatedChildCreate(name, + redundancy, PortfolioData.class, true)); + + // delay the query to let the recovery get underway + Thread.sleep(100); + + try { + // This is a repeat of the original query from before closing and restarting the datastores. This time + // it should fail due to persistent recovery that has not completed. + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRQueryAndCompareResults(name, localName, true)); + fail("Expected PartitionOfflineException when queryiong a region with offline colocated child"); + } catch (Exception e) { + if (!(e.getCause() instanceof PartitionOfflineException)) { + e.printStackTrace(); + throw e; + } + } + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR's 2nd pass (after restarting regions) ENDED"); + } + + /** + * A basic dunit test that
+ * 1. Creates a PR and colocated child region Accessor and Data Store with redundantCopies = 0. + * 2. Populates the region with test data. + * 3. Fires a query on accessor VM and verifies the result. + * 4. Shuts down the caches, then restarts them asynchronously, but don't restart the child region + * 5. Attempt the query while the region offline because of the missing child region + * @throws Exception + */ + @SuppressWarnings("rawtypes") + @Test + public void testColocatedPRQueryDuringRecoveryWithMissingColocatedChild() throws Exception + { + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + setCacheInVMs(vm0, vm1); + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR Test with DACK Started"); + + // Creting PR's on the participating VM's + // Creating Accessor node on the VM0. + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Creating the Accessor node in the PR"); + + vm0.invoke(PRQHelp.getCacheSerializableRunnableForColocatedPRCreate(name, + redundancy, PortfolioData.class, true)); + // Creating local region on vm0 to compare the results of query. + vm0.invoke(PRQHelp.getCacheSerializableRunnableForLocalRegionCreation(localName, PortfolioData.class)); + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully created the Accessor node in the PR"); + + // Creating the Datastores Nodes in the VM1. + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest:testColocatedPRBasicQuerying ----- Creating the Datastore node in the PR"); + vm1.invoke(PRQHelp.getCacheSerializableRunnableForColocatedPRCreate(name, + redundancy, PortfolioData.class, true)); + + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully Created the Datastore node in the PR"); + + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully Created PR's across all VM's"); + + // Generating portfolio object array to be populated across the PR's & Local + // Regions + + final PortfolioData[] portfolio = createPortfolioData(cnt, cntDest); + // Putting the data into the PR's created + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRPuts(name, portfolio, + cnt, cntDest)); + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRDuplicatePuts(name, portfolio, + cnt, cntDest)); + + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Inserted Portfolio data across PR's"); + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRPuts(localName, + portfolio, cnt, cntDest)); + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRDuplicatePuts(localName, + portfolio, cnt, cntDest)); + + // querying the VM for data and comparing the result with query result of + // local region. + // querying the VM for data + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRQueryAndCompareResults( + name, localName)); + + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR's 1st pass ENDED"); + + // Shut everything down and then restart to test queries during recovery + vm0.invoke(PRQHelp.getCacheSerializableRunnableForCloseCache()); + vm1.invoke(PRQHelp.getCacheSerializableRunnableForCloseCache()); + + // Re-create the only the parent region + setCacheInVMs(vm0, vm1); + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Creating the Accessor node in the PR"); + vm0.invoke(PRQHelp.getCacheSerializableRunnableForColocatedParentCreate(name, + redundancy, PortfolioData.class, true)); + + // Creating local region on vm0 to compare the results of query. + vm0.invoke(PRQHelp.getCacheSerializableRunnableForLocalRegionCreation(localName, PortfolioData.class)); + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully created the Accessor node in the PR"); + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest:testColocatedPRBasicQuerying ----- re-creating the Datastore node in the PR"); + vm1.invoke(PRQHelp.getCacheSerializableRunnableForColocatedParentCreate(name, + redundancy, PortfolioData.class, true)); + + try { + // This is a repeat of the original query from before closing and restarting the datastores. This time + // it should fail due to persistent recovery that has not completed. + vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRQueryAndCompareResults(name, localName, true)); + fail("Expected PartitionOfflineException when queryiong a region with offline colocated child"); + } catch (Exception e) { + if (!(e.getCause() instanceof PartitionOfflineException)) { + throw e; + } + } + LogWriterUtils.getLogWriter() + .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR's 2nd pass (after restarting regions) ENDED"); + } + @Test public void testPRCountStarQuery() throws Exception { diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java b/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java index cfb419036817..9dc90fdbcece 100755 --- a/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java +++ b/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java @@ -39,6 +39,7 @@ import org.apache.geode.cache.CacheException; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.DiskStore; import org.apache.geode.cache.EntryExistsException; import org.apache.geode.cache.EntryNotFoundException; import org.apache.geode.cache.PartitionAttributes; @@ -249,6 +250,190 @@ public void run2() throws CacheException return (CacheSerializableRunnable)createPrRegion; } + /** + * This function creates a colocated pair of PR's given the scope & the + * redundancy parameters for the parent * + * + * @param regionName + * @param redundancy + * @param constraint + * @param makePersistent + * @return cacheSerializable object + */ + public CacheSerializableRunnable getCacheSerializableRunnableForColocatedPRCreate( + final String regionName, final int redundancy, final Class constraint, boolean makePersistent) { + + final String childRegionName = regionName + "Child"; + final String diskName = "disk"; + SerializableRunnable createPrRegion; + createPrRegion = new CacheSerializableRunnable(regionName) { + @Override + public void run2() throws CacheException + { + + Cache cache = getCache(); + Region partitionedregion = null; + Region childRegion = null; + AttributesFactory attr = new AttributesFactory(); + attr.setValueConstraint(constraint); + if (makePersistent) { + DiskStore ds = cache.findDiskStore(diskName); + if (ds == null) { + ds = cache.createDiskStoreFactory().setDiskDirs(JUnit4CacheTestCase.getDiskDirs()) + .create(diskName); + } + attr.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + attr.setDiskStoreName(diskName); + } else { + attr.setDataPolicy(DataPolicy.PARTITION); + attr.setDiskStoreName(null); + } + + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(redundancy); + attr.setPartitionAttributes(paf.create()); + + // parent region + partitionedregion = cache.createRegion(regionName, attr.create()); + assertNotNull( + "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region " + + regionName + " not in cache", cache.getRegion(regionName)); + assertNotNull( + "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region ref null", + partitionedregion); + assertTrue( + "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region ref claims to be destroyed", + !partitionedregion.isDestroyed()); + + // child region + attr.setValueConstraint(constraint); + paf.setColocatedWith(regionName); + attr.setPartitionAttributes(paf.create()); + childRegion = cache.createRegion(childRegionName, attr.create()); + } + }; + + return (CacheSerializableRunnable)createPrRegion; + } + + /** + * This function creates the parent region of colocated pair of PR's given the scope & the + * redundancy parameters for the parent * + * + * @param regionName + * @param redundancy + * @param constraint + * @param makePersistent + * @return cacheSerializable object + */ + public CacheSerializableRunnable getCacheSerializableRunnableForColocatedParentCreate( + final String regionName, final int redundancy, final Class constraint, boolean makePersistent) { + + final String childRegionName = regionName + "Child"; + final String diskName = "disk"; + SerializableRunnable createPrRegion; + createPrRegion = new CacheSerializableRunnable(regionName + "-NoChildRegion") { + @Override + public void run2() throws CacheException + { + + Cache cache = getCache(); + Region partitionedregion = null; + Region childRegion = null; + AttributesFactory attr = new AttributesFactory(); + attr.setValueConstraint(constraint); + if (makePersistent) { + DiskStore ds = cache.findDiskStore(diskName); + if (ds == null) { + ds = cache.createDiskStoreFactory().setDiskDirs(JUnit4CacheTestCase.getDiskDirs()) + .create(diskName); + } + attr.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + attr.setDiskStoreName(diskName); + } else { + attr.setDataPolicy(DataPolicy.PARTITION); + attr.setDiskStoreName(null); + } + + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(redundancy); + attr.setPartitionAttributes(paf.create()); + + // parent region + partitionedregion = cache.createRegion(regionName, attr.create()); + assertNotNull( + "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region " + + regionName + " not in cache", cache.getRegion(regionName)); + assertNotNull( + "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region ref null", + partitionedregion); + assertTrue( + "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region ref claims to be destroyed", + !partitionedregion.isDestroyed()); + } + }; + + return (CacheSerializableRunnable)createPrRegion; + } + + /** + * This function creates the parent region of colocated pair of PR's given the scope & the + * redundancy parameters for the parent * + * + * @param regionName + * @param redundancy + * @param constraint + * @param isPersistent + * @return cacheSerializable object + */ + public CacheSerializableRunnable getCacheSerializableRunnableForColocatedChildCreate( + final String regionName, final int redundancy, final Class constraint, boolean isPersistent) { + + final String childRegionName = regionName + "Child"; + final String diskName = "disk"; + SerializableRunnable createPrRegion; + createPrRegion = new CacheSerializableRunnable(regionName + "-ChildRegion") { + @Override + public void run2() throws CacheException + { + + Cache cache = getCache(); + Region partitionedregion = null; + Region childRegion = null; + AttributesFactory attr = new AttributesFactory(); + attr.setValueConstraint(constraint); + if (isPersistent) { + DiskStore ds = cache.findDiskStore(diskName); + if (ds == null) { +// ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()) + ds = cache.createDiskStoreFactory().setDiskDirs(org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase.getDiskDirs()) + .create(diskName); + } + attr.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + attr.setDiskStoreName(diskName); + } else { + attr.setDataPolicy(DataPolicy.PARTITION); + attr.setDiskStoreName(null); + } + + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(redundancy); + attr.setPartitionAttributes(paf.create()); + + // skip parent region creation + // partitionedregion = cache.createRegion(regionName, attr.create()); + + // child region + attr.setValueConstraint(constraint); + paf.setColocatedWith(regionName); + attr.setPartitionAttributes(paf.create()); + childRegion = cache.createRegion(childRegionName, attr.create()); + } + }; + + return (CacheSerializableRunnable)createPrRegion; + } + public CacheSerializableRunnable getCacheSerializableRunnableForPRCreateLimitedBuckets( final String regionName, final int redundancy, final int buckets) { diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentColocatedPartitionedRegionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentColocatedPartitionedRegionDUnitTest.java index 0a25228c9b2f..c15d5455ecd3 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentColocatedPartitionedRegionDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentColocatedPartitionedRegionDUnitTest.java @@ -50,7 +50,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import com.jayway.awaitility.core.ConditionTimeoutException; -import org.junit.experimental.categories.Category; import org.apache.geode.admin.internal.AdminDistributedSystemImpl; import org.apache.geode.cache.AttributesFactory; @@ -64,6 +63,7 @@ import org.apache.geode.cache.control.RebalanceOperation; import org.apache.geode.cache.control.RebalanceResults; import org.apache.geode.cache.persistence.PartitionOfflineException; +import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.DistributionMessage; import org.apache.geode.distributed.internal.DistributionMessageObserver; @@ -72,11 +72,14 @@ import org.apache.geode.internal.cache.ColocationLogger; import org.apache.geode.internal.cache.InitialImageOperation.RequestImageMessage; import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.PartitionedRegionHelper; import org.apache.geode.internal.cache.control.InternalResourceManager; import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserver; import org.apache.geode.test.dunit.Assert; import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.IgnoredException; +import org.apache.geode.test.dunit.LogWriterUtils; +import org.apache.geode.test.dunit.RMIException; import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.SerializableCallable; import org.apache.geode.test.dunit.SerializableRunnable; @@ -2088,7 +2091,7 @@ public void run() { }; //runnable to create PRs - SerializableRunnable createPRs = new SerializableRunnable("region1") { + SerializableRunnable createPRs = new SerializableRunnable("createPRs") { public void run() { Cache cache = getCache(); @@ -2112,7 +2115,7 @@ public void run() { }; //runnable to close the cache. - SerializableRunnable closeCache = new SerializableRunnable("region1") { + SerializableRunnable closeCache = new SerializableRunnable("closeCache") { public void run() { closeCache(); } @@ -2120,7 +2123,7 @@ public void run() { //Runnable to do a bunch of puts handle exceptions //due to the fact that member is offline. - SerializableRunnable doABunchOfPuts = new SerializableRunnable("region1") { + SerializableRunnable doABunchOfPuts = new SerializableRunnable("doABunchOfPuts") { public void run() { Cache cache = getCache(); Region region = cache.getRegion(PR_REGION_NAME); @@ -2200,7 +2203,7 @@ public void run() { @Category(FlakyTest.class) // GEODE-506: time sensitive, async actions with 30 sec max @Test public void testRebalanceWithOfflineChildRegion() throws Throwable { - SerializableRunnable createParentPR = new SerializableRunnable() { + SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") { public void run() { Cache cache = getCache(); @@ -2220,7 +2223,7 @@ public void run() { } }; - SerializableRunnable createChildPR = new SerializableRunnable() { + SerializableRunnable createChildPR = new SerializableRunnable("createChildPR") { public void run() { Cache cache = getCache(); @@ -2325,7 +2328,6 @@ public void run() { }; vm1.invoke(addHook); -// vm1.invoke(addHook); AsyncInvocation async0; AsyncInvocation async1; AsyncInvocation async2; @@ -2335,7 +2337,6 @@ public void run() { async1 = vm1.invokeAsync(createPRs); vm1.invoke(waitForHook); -// vm1.invoke(waitForHook); //Now create the parent region on vm-2. vm-2 did not //previous host the child region. @@ -2347,7 +2348,6 @@ public void run() { } finally { vm1.invoke(removeHook); -// vm1.invoke(removeHook); } async0.getResult(MAX_WAIT); @@ -2473,6 +2473,188 @@ public void testModifyColocation() throws Throwable { closeCache(); } + @Test + public void testParentRegionGetWithOfflineChildRegion() throws Throwable { + + SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") { + public void run() { + String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000"); + try { + Cache cache = getCache(); + DiskStore ds = cache.findDiskStore("disk"); + if (ds == null) { + ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk"); + } + AttributesFactory af = new AttributesFactory(); + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(0); + paf.setRecoveryDelay(0); + af.setPartitionAttributes(paf.create()); + af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + af.setDiskStoreName("disk"); + cache.createRegion(PR_REGION_NAME, af.create()); + } finally { + System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION)); + } + } + }; + + SerializableRunnable createChildPR = new SerializableRunnable("createChildPR") { + public void run() throws InterruptedException { + String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000"); + try { + Cache cache = getCache(); + AttributesFactory af = new AttributesFactory(); + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(0); + paf.setRecoveryDelay(0); + paf.setColocatedWith(PR_REGION_NAME); + af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + af.setDiskStoreName("disk"); + af.setPartitionAttributes(paf.create()); + // delay child region creations to cause a delay in persistent recovery + Thread.sleep(100); + cache.createRegion("region2", af.create()); + } finally { + System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION)); + } + } + }; + + boolean caughtException = false; + try { + // Expect a get() on the un-recovered (due to offline child) parent region to fail + regionGetWithOfflineChild(createParentPR, createChildPR, false); + } catch (Exception e) { + caughtException = true; + assertTrue(e instanceof RMIException); + assertTrue(e.getCause() instanceof PartitionOfflineException); + } + if (!caughtException) { + fail("Expected TimeoutException from remote"); + } + } + + @Test + public void testParentRegionGetWithRecoveryInProgress() throws Throwable { + SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") { + public void run() { + String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000"); + try { + Cache cache = getCache(); + DiskStore ds = cache.findDiskStore("disk"); + if (ds == null) { + ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk"); + } + AttributesFactory af = new AttributesFactory(); + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(0); + paf.setRecoveryDelay(0); + af.setPartitionAttributes(paf.create()); + af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + af.setDiskStoreName("disk"); + cache.createRegion(PR_REGION_NAME, af.create()); + } finally { + System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION)); + System.out.println("oldRetryTimeout = " + oldRetryTimeout); } + } + }; + + SerializableRunnable createChildPR = new SerializableRunnable("createChildPR") { + public void run() throws InterruptedException { + String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000"); + try { + Cache cache = getCache(); + AttributesFactory af = new AttributesFactory(); + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(0); + paf.setRecoveryDelay(0); + paf.setColocatedWith(PR_REGION_NAME); + af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + af.setDiskStoreName("disk"); + af.setPartitionAttributes(paf.create()); + cache.createRegion("region2", af.create()); + } finally { + System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION)); + } + } + }; + + boolean caughtException = false; + try { + // Expect a get() on the un-recovered (due to offline child) parent region to fail + regionGetWithOfflineChild(createParentPR, createChildPR, false); + } catch (Exception e) { + caughtException = true; + assertTrue(e instanceof RMIException); + assertTrue(e.getCause() instanceof PartitionOfflineException); + } + if (!caughtException) { + fail("Expected TimeoutException from remote"); + } + } + + @Test + public void testParentRegionPutWithRecoveryInProgress() throws Throwable { + SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") { + public void run() { + String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000"); + System.out.println("oldRetryTimeout = " + oldRetryTimeout); + try { + Cache cache = getCache(); + DiskStore ds = cache.findDiskStore("disk"); + if (ds == null) { + ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk"); + } + AttributesFactory af = new AttributesFactory(); + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(0); + paf.setRecoveryDelay(0); + af.setPartitionAttributes(paf.create()); + af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + af.setDiskStoreName("disk"); + cache.createRegion(PR_REGION_NAME, af.create()); + } finally { + System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION)); + } + } + }; + + SerializableRunnable createChildPR = new SerializableRunnable("createChildPR") { + public void run() throws InterruptedException { + String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000"); + try { + Cache cache = getCache(); + AttributesFactory af = new AttributesFactory(); + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(0); + paf.setRecoveryDelay(0); + paf.setColocatedWith(PR_REGION_NAME); + af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + af.setDiskStoreName("disk"); + af.setPartitionAttributes(paf.create()); + Thread.sleep(1000); + cache.createRegion("region2", af.create()); + } finally { + System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION)); + } + } + }; + + boolean caughtException = false; + try { + // Expect a get() on the un-recovered (due to offline child) parent region to fail + regionGetWithOfflineChild(createParentPR, createChildPR, false); + } catch (Exception e) { + caughtException = true; + assertTrue(e instanceof RMIException); + assertTrue(e.getCause() instanceof PartitionOfflineException); + } + if (!caughtException) { + fail("Expected TimeoutException from remote"); + } + } + /** * Create three PRs on a VM, named region1, region2, and region3. * The colocated with attribute describes which region region3 @@ -2523,15 +2705,15 @@ public void rebalanceWithOfflineChildRegion(SerializableRunnable createParentPR, vm1.invoke(createParentPR); vm0.invoke(createChildPR); vm1.invoke(createChildPR); - + //Create some buckets. createData(vm0, 0, NUM_BUCKETS, "a"); createData(vm0, 0, NUM_BUCKETS, "a", "region2"); - + //Close the members closeCache(vm1); closeCache(vm0); - + //Recreate the parent region. Try to make sure that //the member with the latest copy of the buckets //is the one that decides to throw away it's copy @@ -2540,18 +2722,17 @@ public void rebalanceWithOfflineChildRegion(SerializableRunnable createParentPR, AsyncInvocation async1 = vm1.invokeAsync(createParentPR); async0.getResult(MAX_WAIT); async1.getResult(MAX_WAIT); - //Now create the parent region on vm-2. vm-2 did not //previous host the child region. vm2.invoke(createParentPR); - + //Rebalance the parent region. //This should not move any buckets, because //we haven't recovered the child region RebalanceResults rebalanceResults = rebalance(vm2); assertEquals(0, rebalanceResults.getTotalBucketTransfersCompleted()); - + //Recreate the child region. async1 = vm1.invokeAsync(createChildPR); async0 = vm0.invokeAsync(createChildPR); @@ -2568,6 +2749,206 @@ public void rebalanceWithOfflineChildRegion(SerializableRunnable createParentPR, createData(vm0, 0, NUM_BUCKETS, "c", "region2"); } + /** + * Create a colocated pair of persistent regions and populate them with data. Shut down the servers and then + * restart them and check the data. + *

+ * On the restart, try region operations ({@code get()}) on the parent region before or during persistent recovery. + * The {@code concurrentCheckData} argument determines whether the operation from the parent region occurs before + * or concurrent with the child region creation and recovery. + * + * @param createParentPR {@link SerializableRunnable} for creating the parent region on one member + * @param createChildPR {@link SerializableRunnable} for creating the child region on one member + * @param concurrentCheckData + * @throws Throwable + */ + public void regionGetWithOfflineChild( + SerializableRunnable createParentPR, + SerializableRunnable createChildPR, + boolean concurrentCheckData) throws Throwable { + Host host = Host.getHost(0); + final VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = host.getVM(2); + + //Create the PRs on two members + vm0.invoke(createParentPR); + vm1.invoke(createParentPR); + vm0.invoke(createChildPR); + vm1.invoke(createChildPR); + + //Create some buckets. + createData(vm0, 0, NUM_BUCKETS, "a"); + createData(vm0, 0, NUM_BUCKETS, "a", "region2"); + + //Close the members + closeCache(vm1); + closeCache(vm0); + + SerializableRunnable checkDataOnParent = (new SerializableRunnable("checkDataOnParent") { + @Override + public void run() { + Cache cache = getCache(); + Region region = cache.getRegion(PR_REGION_NAME); + + for (int i = 0; i < NUM_BUCKETS; i++) { + assertEquals("For key " + i, "a", region.get(i)); + } + } + }); + + try { + //Recreate the parent region. Try to make sure that + //the member with the latest copy of the buckets + //is the one that decides to throw away it's copy + //by starting it last. + AsyncInvocation async0 = vm0.invokeAsync(createParentPR); + AsyncInvocation async1 = vm1.invokeAsync(createParentPR); + async0.getResult(MAX_WAIT); + async1.getResult(MAX_WAIT); + //Now create the parent region on vm-2. vm-2 did not + //previously host the child region. + vm2.invoke(createParentPR); + + AsyncInvocation async2 = null; + AsyncInvocation asyncCheck = null; + if (concurrentCheckData) { + //Recreate the child region. + async1 = vm1.invokeAsync(createChildPR); + async0 = vm0.invokeAsync(createChildPR); + async2 = vm2.invokeAsync(new SerializableRunnable("delay") { + @Override + public void run() throws InterruptedException { + Thread.sleep(100); + vm2.invoke(createChildPR); + } + }); + + asyncCheck = vm0.invokeAsync(checkDataOnParent); + } else { + vm0.invoke(checkDataOnParent); + } + async0.getResult(MAX_WAIT); + async1.getResult(MAX_WAIT); + async2.getResult(MAX_WAIT); + asyncCheck.getResult(MAX_WAIT); + //Validate the data + checkData(vm0, 0, NUM_BUCKETS, "a"); + checkData(vm0, 0, NUM_BUCKETS, "a", "region2"); + //Make sure we can actually use the buckets in the child region. + createData(vm0, 0, NUM_BUCKETS, "c", "region2"); + } finally { + //Close the members + closeCache(vm1); + closeCache(vm0); + closeCache(vm2); + } + } + /** + * Create a colocated pair of persistent regions and populate them with data. Shut down the servers and then + * restart them. + *

+ * On the restart, try region operations ({@code put()}) on the parent region before or during persistent recovery. + * The {@code concurrentCreatekData} argument determines whether the operation from the parent region occurs before + * or concurrent with the child region creation and recovery. + * + * @param createParentPR {@link SerializableRunnable} for creating the parent region on one member + * @param createChildPR {@link SerializableRunnable} for creating the child region on one member + * @param concurrentCreateData + * @throws Throwable + */ + public void regionPutWithOfflineChild( + SerializableRunnable createParentPR, + SerializableRunnable createChildPR, + boolean concurrentCreateData) throws Throwable { + Host host = Host.getHost(0); + final VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = host.getVM(2); + + SerializableRunnable checkDataOnParent = (new SerializableRunnable("checkDataOnParent") { + @Override + public void run() { + Cache cache = getCache(); + Region region = cache.getRegion(PR_REGION_NAME); + + for (int i = 0; i < NUM_BUCKETS; i++) { + assertEquals("For key " + i, "a", region.get(i)); + } + } + }); + + SerializableRunnable createDataOnParent = new SerializableRunnable("createDataOnParent") { + + public void run() { + Cache cache = getCache(); + LogWriterUtils.getLogWriter().info("creating data in " + PR_REGION_NAME); + Region region = cache.getRegion(PR_REGION_NAME); + + for (int i = 0; i < NUM_BUCKETS; i++) { + region.put(i, "c"); + assertEquals("For key " + i, "c", region.get(i)); + } + } + }; + + //Create the PRs on two members + vm0.invoke(createParentPR); + vm1.invoke(createParentPR); + vm0.invoke(createChildPR); + vm1.invoke(createChildPR); + + //Create some buckets. + createData(vm0, 0, NUM_BUCKETS, "a"); + createData(vm0, 0, NUM_BUCKETS, "a", "region2"); + + //Close the members + closeCache(vm1); + closeCache(vm0); + + try { + //Recreate the parent region. Try to make sure that + //the member with the latest copy of the buckets + //is the one that decides to throw away it's copy + //by starting it last. + AsyncInvocation async0 = vm0.invokeAsync(createParentPR); + AsyncInvocation async1 = vm1.invokeAsync(createParentPR); + async0.getResult(MAX_WAIT); + async1.getResult(MAX_WAIT); + //Now create the parent region on vm-2. vm-2 did not + //previous host the child region. + vm2.invoke(createParentPR); + + AsyncInvocation async2 = null; + AsyncInvocation asyncPut = null; + if (concurrentCreateData) { + //Recreate the child region. + async1 = vm1.invokeAsync(createChildPR); + async0 = vm0.invokeAsync(createChildPR); + async2 = vm2.invokeAsync(createChildPR); + + Thread.sleep(100); + asyncPut = vm0.invokeAsync(createDataOnParent); + } else { + vm0.invoke(createDataOnParent); + } + async0.getResult(MAX_WAIT); + async1.getResult(MAX_WAIT); + async2.getResult(MAX_WAIT); + asyncPut.getResult(MAX_WAIT); + //Validate the data + checkData(vm0, 0, NUM_BUCKETS, "c"); + checkData(vm0, 0, NUM_BUCKETS, "a", "region2"); + //Make sure we can actually use the buckets in the child region. + createData(vm0, 0, NUM_BUCKETS, "c", "region2"); + } finally { + //Close the members + closeCache(vm1); + closeCache(vm0); + closeCache(vm2); + } + } + private RebalanceResults rebalance(VM vm) { return (RebalanceResults) vm.invoke(new SerializableCallable() { From 2f8cfbaf10fcefb8f14eab6bbedef090d88a96dd Mon Sep 17 00:00:00 2001 From: Ken Howe Date: Mon, 17 Oct 2016 16:03:50 -0700 Subject: [PATCH 2/3] GEODE-538: Refactored check for persistent recovery Moved the recovery check to a new method in PartitionedRegion instead of having duplicated code in PRHARedundancyProvider and DefaultQuery. Added throws declaration to javadoc in the Query Interface --- .../org/apache/geode/cache/query/Query.java | 3 +++ .../cache/query/internal/DefaultQuery.java | 12 +++--------- .../internal/cache/PRHARedundancyProvider.java | 15 +++++---------- .../geode/internal/cache/PartitionedRegion.java | 17 +++++++++++++++-- 4 files changed, 26 insertions(+), 21 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/Query.java b/geode-core/src/main/java/org/apache/geode/cache/query/Query.java index e27687d48e91..9726fbe6b113 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/Query.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/Query.java @@ -89,6 +89,9 @@ public interface Query { * @throws QueryExecutionLowMemoryException * If the query gets canceled due to low memory conditions and * the resource manager critical heap percentage has been set + * @throws PartitionOfflineException + * If persistent data recovery is not complete for a partitioned + * region referred to in the query. */ public Object execute() throws FunctionDomainException, TypeMismatchException, NameResolutionException, diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java index 7e445ec350e8..8175d826f7b9 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java @@ -34,6 +34,7 @@ import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.internal.NanoTimer; import org.apache.geode.internal.cache.*; +import org.apache.geode.internal.cache.partitioned.RegionAdvisor; import org.apache.geode.internal.i18n.LocalizedStrings; import java.util.*; @@ -583,7 +584,7 @@ public Object executeUsingContext(ExecutionContext context) } - private QueryExecutor checkQueryOnPR(Object[] parameters) throws RegionNotFoundException { + private QueryExecutor checkQueryOnPR(Object[] parameters) throws RegionNotFoundException, PartitionOfflineException { // check for PartititionedRegions. If a PartitionedRegion is referred to in the query, // then the following restrictions apply: @@ -603,14 +604,7 @@ private QueryExecutor checkQueryOnPR(Object[] parameters) throws RegionNotFoundE throw new RegionNotFoundException(LocalizedStrings.DefaultQuery_REGION_NOT_FOUND_0.toLocalizedString(regionPath)); } if (rgn instanceof QueryExecutor) { - - if (((PartitionedRegion)rgn).getDataPolicy().withPersistence() && !((PartitionedRegion)rgn).isRecoveredFromDisk()) { - ((PartitionedRegion)rgn).getDistributionAdvisor(); - Set persistIds = new HashSet(((PartitionedRegion)rgn).getRegionAdvisor().advisePersistentMembers().values()); - persistIds.removeAll(((PartitionedRegion)rgn).getRegionAdvisor().adviseInitializedPersistentMembers().values()); - throw new PartitionOfflineException(persistIds, LocalizedStrings.PRHARedundancyProvider_PARTITIONED_REGION_0_OFFLINE_HAS_UNRECOVERED_PERSISTENT_DATA_1 - .toLocalizedString(new Object[] { ((PartitionedRegion)rgn).getFullPath(), persistIds})); - } + ((PartitionedRegion)rgn).checkPROffline(); prs.add((QueryExecutor)rgn); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java index 4d946920c0ce..6245c3783e08 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java @@ -496,25 +496,20 @@ public InternalDistributedMember createBucketOnDataStore(int bucketId, * redundancy. * @throws PartitionedRegionException * if d-lock can not be acquired to create bucket. - * + * @throws PartitionOfflineException + * if persistent data recovery is not complete for a partitioned + * region referred to in the query. */ public InternalDistributedMember createBucketAtomically(final int bucketId, final int newBucketSize, final long startTime, final boolean finishIncompleteCreation, String partitionName) throws PartitionedRegionStorageException, - PartitionedRegionException + PartitionedRegionException, PartitionOfflineException { final boolean isDebugEnabled = logger.isDebugEnabled(); - if (!prRegion.isRecoveredFromDisk()) { - prRegion.getDistributionAdvisor(); - Set persistIds = new HashSet(prRegion.getRegionAdvisor().advisePersistentMembers().values()); - persistIds.removeAll(prRegion.getRegionAdvisor().adviseInitializedPersistentMembers().values()); - throw new PartitionOfflineException(persistIds, LocalizedStrings.PRHARedundancyProvider_PARTITIONED_REGION_0_OFFLINE_HAS_UNRECOVERED_PERSISTENT_DATA_1 - .toLocalizedString(new Object[] { prRegion.getFullPath(), persistIds})); - } - + prRegion.checkPROffline(); // If there are insufficient stores throw *before* we try acquiring the // (very expensive) bucket lock or the (somewhat expensive) monitor on this diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index a57796d3d5c5..f7ecdaf1b8f1 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -28,6 +28,8 @@ import org.apache.geode.cache.execute.*; import org.apache.geode.cache.partition.PartitionListener; import org.apache.geode.cache.partition.PartitionNotAvailableException; +import org.apache.geode.cache.persistence.PartitionOfflineException; +import org.apache.geode.cache.persistence.PersistentID; import org.apache.geode.cache.query.*; import org.apache.geode.cache.query.internal.*; import org.apache.geode.cache.query.internal.index.*; @@ -1397,8 +1399,19 @@ void setRecoveredFromDisk() { new UpdateAttributesProcessor(this).distribute(false); } - public boolean isRecoveredFromDisk() { - return recoveredFromDisk; + /** + * Throw an exception if persistent data recovery from disk is not complete + * for this region. + * + * @throws PartitionOfflineException + */ + public void checkPROffline() throws PartitionOfflineException { + if (getDataPolicy().withPersistence() && !recoveredFromDisk) { + Set persistIds = new HashSet(getRegionAdvisor().advisePersistentMembers().values()); + persistIds.removeAll(getRegionAdvisor().adviseInitializedPersistentMembers().values()); + throw new PartitionOfflineException(persistIds, LocalizedStrings.PRHARedundancyProvider_PARTITIONED_REGION_0_OFFLINE_HAS_UNRECOVERED_PERSISTENT_DATA_1 + .toLocalizedString(new Object[] { getFullPath(), persistIds})); + } } public final void updatePRConfig(PartitionRegionConfig prConfig, From 77768628ba59f8a4b7d3bdb6738a9c1dcd91a4ac Mon Sep 17 00:00:00 2001 From: Ken Howe Date: Tue, 18 Oct 2016 10:11:24 -0700 Subject: [PATCH 3/3] GEODE-538: Added PartitionOfflineException to javadocs Added the exception declaration to the javadocs for other execute methods. --- .../main/java/org/apache/geode/cache/query/Query.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/Query.java b/geode-core/src/main/java/org/apache/geode/cache/query/Query.java index 9726fbe6b113..670b262998f6 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/Query.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/Query.java @@ -153,6 +153,9 @@ public Object execute() * @throws QueryExecutionLowMemoryException * If the query gets canceled due to low memory conditions and * the resource manager critical heap percentage has been set + * @throws PartitionOfflineException + * If persistent data recovery is not complete for a partitioned + * region referred to in the query. * */ public Object execute(Object[] params) @@ -223,6 +226,9 @@ public Object execute(Object[] params) * @throws QueryExecutionLowMemoryException * If the query gets canceled due to low memory conditions and * the resource manager critical heap percentage has been set + * @throws PartitionOfflineException + * If persistent data recovery is not complete for a partitioned + * region referred to in the query. */ public Object execute(RegionFunctionContext context) throws FunctionDomainException, TypeMismatchException, NameResolutionException, @@ -294,6 +300,9 @@ public Object execute(RegionFunctionContext context) * @throws QueryExecutionLowMemoryException * If the query gets canceled due to low memory conditions and * the resource manager critical heap percentage has been set + * @throws PartitionOfflineException + * If persistent data recovery is not complete for a partitioned + * region referred to in the query. */ public Object execute(RegionFunctionContext context, Object[] params) throws FunctionDomainException, TypeMismatchException, NameResolutionException,