diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/index/IndexManagerIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/index/IndexManagerIntegrationTest.java new file mode 100644 index 000000000000..c4fd7e2538d6 --- /dev/null +++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/index/IndexManagerIntegrationTest.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.query.internal.index; + +import static org.apache.geode.cache.RegionShortcut.PARTITION; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.stream.IntStream; + +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; +import junitparams.naming.TestCaseName; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.query.Index; +import org.apache.geode.cache.query.IndexExistsException; +import org.apache.geode.cache.query.IndexNameConflictException; +import org.apache.geode.cache.query.QueryService; +import org.apache.geode.cache.query.RegionNotFoundException; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.InternalRegion; +import org.apache.geode.test.assertj.LogFileAssert; +import org.apache.geode.test.junit.categories.OQLIndexTest; +import org.apache.geode.test.junit.rules.ServerStarterRule; + +@Category(OQLIndexTest.class) +@RunWith(JUnitParamsRunner.class) +public class IndexManagerIntegrationTest { + private File logFile; + private final int entries = 100; + private InternalCache internalCache; + + @Rule + public TestName testName = new TestName(); + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule + public ServerStarterRule serverStarterRule = new ServerStarterRule(); + + @Before + public void setUp() throws IOException, ClassNotFoundException { + TestQueryObject.throwException = false; + + logFile = temporaryFolder.newFile(testName.getMethodName() + ".log"); + serverStarterRule.withProperty("log-file", logFile.getAbsolutePath()).startServer(); + internalCache = serverStarterRule.getCache(); + } + + @SuppressWarnings("unused") + private Object[] getRegionAndIndexMaintenanceTypes() { + return new Object[] { + new Object[] {"LOCAL", "true"}, + new Object[] {"LOCAL", "false"}, + + new Object[] {"REPLICATE", "true"}, + new Object[] {"REPLICATE", "false"}, + + new Object[] {"PARTITION", "true"}, + new Object[] {"PARTITION", "false"}, + }; + } + + private void waitForIndexUpdaterTask(boolean synchronousMaintenance, Region region) { + if (!synchronousMaintenance) { + InternalRegion internalRegion = (InternalRegion) region; + await().untilAsserted( + () -> assertThat(internalRegion.getIndexManager().getUpdaterThread().isDone()).isTrue()); + } + } + + private Region createAndPopulateRegionWithIndex( + RegionShortcut regionShortcut, String regionName, String indexName, + boolean synchronousMaintenance) + throws IndexNameConflictException, IndexExistsException, RegionNotFoundException { + Region region = internalCache + .createRegionFactory(regionShortcut) + .setIndexMaintenanceSynchronous(synchronousMaintenance) + .create(regionName); + + QueryService queryService = internalCache.getQueryService(); + queryService.createIndex(indexName, "id", "/" + regionName); + IntStream.range(1, entries + 1).forEach(i -> region.put(i, new TestQueryObject(i))); + waitForIndexUpdaterTask(synchronousMaintenance, region); + + Index index = queryService.getIndex(region, indexName); + assertThat(index.isValid()).isTrue(); + + return region; + } + + @Test + @Parameters(method = "getRegionAndIndexMaintenanceTypes") + @TestCaseName("[{index}] {method}(RegionType:{0};IndexSynchronousMaintenance:{1})") + public void indexShouldBeMarkedAsInvalidWhenAddMappingOperationFailsAfterEntryAddition( + RegionShortcut regionShortcut, boolean synchronousMaintenance) throws Exception { + int newKey = entries + 2; + String regionName = testName.getMethodName(); + String indexName = testName.getMethodName() + "_index"; + Region region = createAndPopulateRegionWithIndex(regionShortcut, + regionName, indexName, synchronousMaintenance); + + // Create entry, add mapping will throw an exception when invoking 'getId()'. + TestQueryObject.throwException = true; + assertThat(region.containsKey(newKey)).isFalse(); + TestQueryObject testQueryObject = new TestQueryObject(0); + assertThatCode(() -> region.create(newKey, testQueryObject)).doesNotThrowAnyException(); + waitForIndexUpdaterTask(synchronousMaintenance, region); + + // Entry created, index marked as invalid, and error logged. + assertThat(region.containsKey(newKey)).isTrue(); + assertThat(region.get(newKey)).isEqualTo(testQueryObject); + Index indexInvalid = internalCache.getQueryService().getIndex(region, indexName); + assertThat(indexInvalid.isValid()).isFalse(); + LogFileAssert.assertThat(logFile) + .contains(String.format( + "Updating the Index %s failed. The index is corrupted and marked as invalid.", + indexName)); + } + + @Test + @Parameters(method = "getRegionAndIndexMaintenanceTypes") + @TestCaseName("[{index}] {method}(RegionType:{0};IndexSynchronousMaintenance:{1})") + public void indexShouldBeMarkedAsInvalidWhenAddMappingOperationFailsAfterEntryModification( + RegionShortcut regionShortcut, boolean synchronousMaintenance) throws Exception { + int existingKey = entries / 2; + String regionName = testName.getMethodName(); + String indexName = testName.getMethodName() + "_index"; + Region region = createAndPopulateRegionWithIndex(regionShortcut, + regionName, indexName, synchronousMaintenance); + + // Update entry, add mapping will throw an exception when invoking 'getId()'. + TestQueryObject.throwException = true; + TestQueryObject testQueryObject = new TestQueryObject(0); + assertThatCode(() -> region.put(existingKey, testQueryObject)).doesNotThrowAnyException(); + waitForIndexUpdaterTask(synchronousMaintenance, region); + + // Entry updated, index marked as invalid, and error logged. + assertThat(region.get(existingKey)).isEqualTo(testQueryObject); + Index indexInvalid = internalCache.getQueryService().getIndex(region, indexName); + assertThat(indexInvalid.isValid()).isFalse(); + LogFileAssert.assertThat(logFile) + .contains(String.format( + "Updating the Index %s failed. The index is corrupted and marked as invalid.", + indexName)); + } + + @Test + @Parameters(method = "getRegionAndIndexMaintenanceTypes") + @TestCaseName("[{index}] {method}(RegionType:{0};IndexSynchronousMaintenance:{1})") + public void indexShouldBeMarkedAsInvalidWhenAddMappingOperationFailsAfterEntryDeletion( + RegionShortcut regionShortcut, boolean synchronousMaintenance) throws Exception { + int existingKey = entries / 3; + String regionName = testName.getMethodName(); + String indexName = testName.getMethodName() + "_index"; + Region region = createAndPopulateRegionWithIndex(regionShortcut, + regionName, indexName, synchronousMaintenance); + + // Remove entry, remove mapping will throw an exception when invoking 'getId()'. + TestQueryObject.throwException = true; + assertThat(region.containsKey(existingKey)).isTrue(); + + // Make sure we get the exception for asynchronous indexes. + if (!synchronousMaintenance) { + // RangeIndex for asynchronous maintenance, hack internal structures to throw exception. + Index index = internalCache.getQueryService().getIndex(region, indexName); + + if (!PARTITION.equals(regionShortcut)) { + ((RangeIndex) index).valueToEntriesMap.clear(); + } else { + @SuppressWarnings("unchecked") + List bucketRangeIndexList = ((PartitionedIndex) index).getBucketIndexes(); + bucketRangeIndexList.forEach(rangeIndex -> rangeIndex.valueToEntriesMap.clear()); + } + } + + assertThatCode(() -> region.destroy(existingKey)).doesNotThrowAnyException(); + waitForIndexUpdaterTask(synchronousMaintenance, region); + + // Entry deleted, index marked as invalid, and error logged. + assertThat(region.get(existingKey)).isNull(); + Index indexInvalid = internalCache.getQueryService().getIndex(region, indexName); + assertThat(indexInvalid.isValid()).isFalse(); + LogFileAssert.assertThat(logFile) + .contains(String.format( + "Updating the Index %s failed. The index is corrupted and marked as invalid.", + indexName)); + } + + private static class TestQueryObject implements Serializable { + private final int id; + static transient boolean throwException = false; + + public int getId() { + if (throwException) { + throw new RuntimeException("Mock Exception"); + } else { + return id; + } + } + + TestQueryObject(int id) { + this.id = id; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + TestQueryObject that = (TestQueryObject) o; + return id == that.id; + } + + @Override + public int hashCode() { + return id; + } + } +} diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java index fd62a26dc8c1..434ee43bdaf5 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java @@ -1129,7 +1129,7 @@ private void processAction(RegionEntry entry, int action, int opCode) throws Que } start = ((AbstractIndex) index).updateIndexUpdateStats(); - index.removeIndexMapping(entry, opCode); + removeIndexMapping(entry, index, opCode); ((AbstractIndex) index).updateIndexUpdateStats(start); } @@ -1149,14 +1149,27 @@ private void processAction(RegionEntry entry, int action, int opCode) throws Que } } - private void addIndexMapping(RegionEntry entry, IndexProtocol index) throws IMQException { + void addIndexMapping(RegionEntry entry, IndexProtocol index) { try { index.addIndexMapping(entry); } catch (Exception exception) { index.markValid(false); setPRIndexAsInvalid((AbstractIndex) index); - logger.warn("Put operation for the entry corrupted the index : " - + ((AbstractIndex) index).indexName + " with the exception : \n " + exception); + logger.warn(String.format( + "Updating the Index %s failed. The index is corrupted and marked as invalid.", + ((AbstractIndex) index).indexName), exception); + } + } + + void removeIndexMapping(RegionEntry entry, IndexProtocol index, int opCode) { + try { + index.removeIndexMapping(entry, opCode); + } catch (Exception exception) { + index.markValid(false); + setPRIndexAsInvalid((AbstractIndex) index); + logger.warn(String.format( + "Updating the Index %s failed. The index is corrupted and marked as invalid.", + ((AbstractIndex) index).indexName), exception); } } diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/IndexManagerTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/IndexManagerTest.java new file mode 100644 index 000000000000..8bb1ba3beb92 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/IndexManagerTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.query.internal.index; + +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.geode.cache.EvictionAction; +import org.apache.geode.cache.EvictionAttributes; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionAttributes; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.RegionEntry; + +public class IndexManagerTest { + private IndexManager indexManager; + + @Before + public void setUp() { + Region region = mock(Region.class); + RegionAttributes regionAttributes = mock(RegionAttributes.class); + when(regionAttributes.getIndexMaintenanceSynchronous()).thenReturn(true); + when(regionAttributes.getEvictionAttributes()).thenReturn(mock(EvictionAttributes.class)); + when(regionAttributes.getEvictionAttributes().getAction()) + .thenReturn(EvictionAction.DEFAULT_EVICTION_ACTION); + when(region.getAttributes()).thenReturn(regionAttributes); + + indexManager = new IndexManager(mock(InternalCache.class), region); + } + + @Test + public void addIndexMappingShouldMarkIndexAsInvalidWhenAddMappingOperationFails() + throws IMQException { + RegionEntry mockEntry = mock(RegionEntry.class); + AbstractIndex mockIndex = mock(AbstractIndex.class); + mockIndex.prIndex = mock(AbstractIndex.class); + when(mockIndex.addIndexMapping(any())).thenThrow(new IMQException("Mock Exception")); + + assertThatCode(() -> indexManager.addIndexMapping(mockEntry, mockIndex)) + .doesNotThrowAnyException(); + verify(mockIndex, times(1)).markValid(false); + verify((AbstractIndex) mockIndex.prIndex, times(1)).markValid(false); + } + + @Test + public void removeIndexMappingShouldMarkIndexAsInvalidWhenRemoveMappingOperationFails() + throws IMQException { + RegionEntry mockEntry = mock(RegionEntry.class); + AbstractIndex mockIndex = mock(AbstractIndex.class); + mockIndex.prIndex = mock(AbstractIndex.class); + when(mockIndex.removeIndexMapping(mockEntry, 1)).thenThrow(new IMQException("Mock Exception")); + + assertThatCode(() -> indexManager.removeIndexMapping(mockEntry, mockIndex, 1)) + .doesNotThrowAnyException(); + verify(mockIndex, times(1)).markValid(false); + verify((AbstractIndex) mockIndex.prIndex, times(1)).markValid(false); + } +}