Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.query.internal.index;

import static org.apache.geode.cache.RegionShortcut.PARTITION;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.stream.IntStream;

import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import junitparams.naming.TestCaseName;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;

import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.query.Index;
import org.apache.geode.cache.query.IndexExistsException;
import org.apache.geode.cache.query.IndexNameConflictException;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.RegionNotFoundException;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.test.assertj.LogFileAssert;
import org.apache.geode.test.junit.categories.OQLIndexTest;
import org.apache.geode.test.junit.rules.ServerStarterRule;

@Category(OQLIndexTest.class)
@RunWith(JUnitParamsRunner.class)
public class IndexManagerIntegrationTest {
private File logFile;
private final int entries = 100;
private InternalCache internalCache;

@Rule
public TestName testName = new TestName();

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

@Rule
public ServerStarterRule serverStarterRule = new ServerStarterRule();

@Before
public void setUp() throws IOException, ClassNotFoundException {
TestQueryObject.throwException = false;

logFile = temporaryFolder.newFile(testName.getMethodName() + ".log");
serverStarterRule.withProperty("log-file", logFile.getAbsolutePath()).startServer();
internalCache = serverStarterRule.getCache();
}

@SuppressWarnings("unused")
private Object[] getRegionAndIndexMaintenanceTypes() {
return new Object[] {
new Object[] {"LOCAL", "true"},
new Object[] {"LOCAL", "false"},

new Object[] {"REPLICATE", "true"},
new Object[] {"REPLICATE", "false"},

new Object[] {"PARTITION", "true"},
new Object[] {"PARTITION", "false"},
};
}

private void waitForIndexUpdaterTask(boolean synchronousMaintenance, Region region) {
if (!synchronousMaintenance) {
InternalRegion internalRegion = (InternalRegion) region;
await().untilAsserted(
() -> assertThat(internalRegion.getIndexManager().getUpdaterThread().isDone()).isTrue());
}
}

private Region<Integer, TestQueryObject> createAndPopulateRegionWithIndex(
RegionShortcut regionShortcut, String regionName, String indexName,
boolean synchronousMaintenance)
throws IndexNameConflictException, IndexExistsException, RegionNotFoundException {
Region<Integer, TestQueryObject> region = internalCache
.<Integer, TestQueryObject>createRegionFactory(regionShortcut)
.setIndexMaintenanceSynchronous(synchronousMaintenance)
.create(regionName);

QueryService queryService = internalCache.getQueryService();
queryService.createIndex(indexName, "id", "/" + regionName);
IntStream.range(1, entries + 1).forEach(i -> region.put(i, new TestQueryObject(i)));
waitForIndexUpdaterTask(synchronousMaintenance, region);

Index index = queryService.getIndex(region, indexName);
assertThat(index.isValid()).isTrue();

return region;
}

@Test
@Parameters(method = "getRegionAndIndexMaintenanceTypes")
@TestCaseName("[{index}] {method}(RegionType:{0};IndexSynchronousMaintenance:{1})")
public void indexShouldBeMarkedAsInvalidWhenAddMappingOperationFailsAfterEntryAddition(
RegionShortcut regionShortcut, boolean synchronousMaintenance) throws Exception {
int newKey = entries + 2;
String regionName = testName.getMethodName();
String indexName = testName.getMethodName() + "_index";
Region<Integer, TestQueryObject> region = createAndPopulateRegionWithIndex(regionShortcut,
regionName, indexName, synchronousMaintenance);

// Create entry, add mapping will throw an exception when invoking 'getId()'.
TestQueryObject.throwException = true;
assertThat(region.containsKey(newKey)).isFalse();
TestQueryObject testQueryObject = new TestQueryObject(0);
assertThatCode(() -> region.create(newKey, testQueryObject)).doesNotThrowAnyException();
waitForIndexUpdaterTask(synchronousMaintenance, region);

// Entry created, index marked as invalid, and error logged.
assertThat(region.containsKey(newKey)).isTrue();
assertThat(region.get(newKey)).isEqualTo(testQueryObject);
Index indexInvalid = internalCache.getQueryService().getIndex(region, indexName);
assertThat(indexInvalid.isValid()).isFalse();
LogFileAssert.assertThat(logFile)
.contains(String.format(
"Updating the Index %s failed. The index is corrupted and marked as invalid.",
indexName));
}

@Test
@Parameters(method = "getRegionAndIndexMaintenanceTypes")
@TestCaseName("[{index}] {method}(RegionType:{0};IndexSynchronousMaintenance:{1})")
public void indexShouldBeMarkedAsInvalidWhenAddMappingOperationFailsAfterEntryModification(
RegionShortcut regionShortcut, boolean synchronousMaintenance) throws Exception {
int existingKey = entries / 2;
String regionName = testName.getMethodName();
String indexName = testName.getMethodName() + "_index";
Region<Integer, TestQueryObject> region = createAndPopulateRegionWithIndex(regionShortcut,
regionName, indexName, synchronousMaintenance);

// Update entry, add mapping will throw an exception when invoking 'getId()'.
TestQueryObject.throwException = true;
TestQueryObject testQueryObject = new TestQueryObject(0);
assertThatCode(() -> region.put(existingKey, testQueryObject)).doesNotThrowAnyException();
waitForIndexUpdaterTask(synchronousMaintenance, region);

// Entry updated, index marked as invalid, and error logged.
assertThat(region.get(existingKey)).isEqualTo(testQueryObject);
Index indexInvalid = internalCache.getQueryService().getIndex(region, indexName);
assertThat(indexInvalid.isValid()).isFalse();
LogFileAssert.assertThat(logFile)
.contains(String.format(
"Updating the Index %s failed. The index is corrupted and marked as invalid.",
indexName));
}

@Test
@Parameters(method = "getRegionAndIndexMaintenanceTypes")
@TestCaseName("[{index}] {method}(RegionType:{0};IndexSynchronousMaintenance:{1})")
public void indexShouldBeMarkedAsInvalidWhenAddMappingOperationFailsAfterEntryDeletion(
RegionShortcut regionShortcut, boolean synchronousMaintenance) throws Exception {
int existingKey = entries / 3;
String regionName = testName.getMethodName();
String indexName = testName.getMethodName() + "_index";
Region<Integer, TestQueryObject> region = createAndPopulateRegionWithIndex(regionShortcut,
regionName, indexName, synchronousMaintenance);

// Remove entry, remove mapping will throw an exception when invoking 'getId()'.
TestQueryObject.throwException = true;
assertThat(region.containsKey(existingKey)).isTrue();

// Make sure we get the exception for asynchronous indexes.
if (!synchronousMaintenance) {
// RangeIndex for asynchronous maintenance, hack internal structures to throw exception.
Index index = internalCache.getQueryService().getIndex(region, indexName);

if (!PARTITION.equals(regionShortcut)) {
((RangeIndex) index).valueToEntriesMap.clear();
} else {
@SuppressWarnings("unchecked")
List<RangeIndex> bucketRangeIndexList = ((PartitionedIndex) index).getBucketIndexes();
bucketRangeIndexList.forEach(rangeIndex -> rangeIndex.valueToEntriesMap.clear());
}
}

assertThatCode(() -> region.destroy(existingKey)).doesNotThrowAnyException();
waitForIndexUpdaterTask(synchronousMaintenance, region);

// Entry deleted, index marked as invalid, and error logged.
assertThat(region.get(existingKey)).isNull();
Index indexInvalid = internalCache.getQueryService().getIndex(region, indexName);
assertThat(indexInvalid.isValid()).isFalse();
LogFileAssert.assertThat(logFile)
.contains(String.format(
"Updating the Index %s failed. The index is corrupted and marked as invalid.",
indexName));
}

private static class TestQueryObject implements Serializable {
private final int id;
static transient boolean throwException = false;

public int getId() {
if (throwException) {
throw new RuntimeException("Mock Exception");
} else {
return id;
}
}

TestQueryObject(int id) {
this.id = id;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
TestQueryObject that = (TestQueryObject) o;
return id == that.id;
}

@Override
public int hashCode() {
return id;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,7 @@ private void processAction(RegionEntry entry, int action, int opCode) throws Que
}
start = ((AbstractIndex) index).updateIndexUpdateStats();

index.removeIndexMapping(entry, opCode);
removeIndexMapping(entry, index, opCode);

((AbstractIndex) index).updateIndexUpdateStats(start);
}
Expand All @@ -1149,14 +1149,27 @@ private void processAction(RegionEntry entry, int action, int opCode) throws Que
}
}

private void addIndexMapping(RegionEntry entry, IndexProtocol index) throws IMQException {
void addIndexMapping(RegionEntry entry, IndexProtocol index) {
try {
index.addIndexMapping(entry);
} catch (Exception exception) {
index.markValid(false);
setPRIndexAsInvalid((AbstractIndex) index);
logger.warn("Put operation for the entry corrupted the index : "
+ ((AbstractIndex) index).indexName + " with the exception : \n " + exception);
logger.warn(String.format(
"Updating the Index %s failed. The index is corrupted and marked as invalid.",
((AbstractIndex) index).indexName), exception);
}
}

void removeIndexMapping(RegionEntry entry, IndexProtocol index, int opCode) {
try {
index.removeIndexMapping(entry, opCode);
} catch (Exception exception) {
index.markValid(false);
setPRIndexAsInvalid((AbstractIndex) index);
logger.warn(String.format(
"Updating the Index %s failed. The index is corrupted and marked as invalid.",
((AbstractIndex) index).indexName), exception);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.query.internal.index;

import static org.assertj.core.api.Assertions.assertThatCode;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.junit.Before;
import org.junit.Test;

import org.apache.geode.cache.EvictionAction;
import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.RegionEntry;

public class IndexManagerTest {
private IndexManager indexManager;

@Before
public void setUp() {
Region region = mock(Region.class);
RegionAttributes regionAttributes = mock(RegionAttributes.class);
when(regionAttributes.getIndexMaintenanceSynchronous()).thenReturn(true);
when(regionAttributes.getEvictionAttributes()).thenReturn(mock(EvictionAttributes.class));
when(regionAttributes.getEvictionAttributes().getAction())
.thenReturn(EvictionAction.DEFAULT_EVICTION_ACTION);
when(region.getAttributes()).thenReturn(regionAttributes);

indexManager = new IndexManager(mock(InternalCache.class), region);
}

@Test
public void addIndexMappingShouldMarkIndexAsInvalidWhenAddMappingOperationFails()
throws IMQException {
RegionEntry mockEntry = mock(RegionEntry.class);
AbstractIndex mockIndex = mock(AbstractIndex.class);
mockIndex.prIndex = mock(AbstractIndex.class);
when(mockIndex.addIndexMapping(any())).thenThrow(new IMQException("Mock Exception"));

assertThatCode(() -> indexManager.addIndexMapping(mockEntry, mockIndex))
.doesNotThrowAnyException();
verify(mockIndex, times(1)).markValid(false);
verify((AbstractIndex) mockIndex.prIndex, times(1)).markValid(false);
}

@Test
public void removeIndexMappingShouldMarkIndexAsInvalidWhenRemoveMappingOperationFails()
throws IMQException {
RegionEntry mockEntry = mock(RegionEntry.class);
AbstractIndex mockIndex = mock(AbstractIndex.class);
mockIndex.prIndex = mock(AbstractIndex.class);
when(mockIndex.removeIndexMapping(mockEntry, 1)).thenThrow(new IMQException("Mock Exception"));

assertThatCode(() -> indexManager.removeIndexMapping(mockEntry, mockIndex, 1))
.doesNotThrowAnyException();
verify(mockIndex, times(1)).markValid(false);
verify((AbstractIndex) mockIndex.prIndex, times(1)).markValid(false);
}
}