Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "GEODE-8536: Allow limited retries when creating Lucene IndexW… #5656

Merged
merged 1 commit into from Oct 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import junitparams.Parameters;
import org.apache.commons.lang3.RandomStringUtils;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.Before;
Expand Down Expand Up @@ -110,6 +111,7 @@ private BucketRegion getFileAndChunkBucket() {
}

@Test
@Parameters()
public void lockedBucketShouldPreventPrimaryFromMoving() {
dataStore1.invoke(this::initDataStoreAndLuceneIndex);
dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache()));
Expand Down

This file was deleted.

Expand Up @@ -44,7 +44,6 @@ public class IndexRepositoryFactory {
private static final Logger logger = LogService.getLogger();
public static final String FILE_REGION_LOCK_FOR_BUCKET_ID = "FileRegionLockForBucketId:";
public static final String APACHE_GEODE_INDEX_COMPLETE = "APACHE_GEODE_INDEX_COMPLETE";
protected static final int GET_INDEX_WRITER_MAX_ATTEMPTS = 200;

public IndexRepositoryFactory() {}

Expand Down Expand Up @@ -75,8 +74,7 @@ public IndexRepository computeIndexRepository(final Integer bucketId, LuceneSeri
* This is a util function just to not let computeIndexRepository be a huge chunk of code.
*/
protected IndexRepository finishComputingRepository(Integer bucketId, LuceneSerializer serializer,
PartitionedRegion userRegion, IndexRepository oldRepository, InternalLuceneIndex index)
throws IOException {
PartitionedRegion userRegion, IndexRepository oldRepository, InternalLuceneIndex index) {
LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion) index;
final PartitionedRegion fileRegion = indexForPR.getFileAndChunkRegion();
BucketRegion fileAndChunkBucket = getMatchingBucket(fileRegion, bucketId);
Expand Down Expand Up @@ -131,7 +129,7 @@ protected IndexRepository finishComputingRepository(Integer bucketId, LuceneSeri
} catch (IOException e) {
logger.warn("Exception thrown while constructing Lucene Index for bucket:" + bucketId
+ " for file region:" + fileAndChunkBucket.getFullPath(), e);
throw e;
return null;
} catch (CacheClosedException e) {
logger.info("CacheClosedException thrown while constructing Lucene Index for bucket:"
+ bucketId + " for file region:" + fileAndChunkBucket.getFullPath());
Expand All @@ -146,34 +144,11 @@ protected IndexRepository finishComputingRepository(Integer bucketId, LuceneSeri

protected IndexWriter buildIndexWriter(int bucketId, BucketRegion fileAndChunkBucket,
LuceneIndexForPartitionedRegion indexForPR) throws IOException {
int attempts = 0;
// IOExceptions can occur if the fileAndChunk region is being modified while the IndexWriter is
// being initialized, so allow limited retries here to account for that timing window
while (true) {
// bucketTargetingMap handles partition resolver (via bucketId as callbackArg)
Map<Object, Object> bucketTargetingMap = getBucketTargetingMap(fileAndChunkBucket, bucketId);
RegionDirectory dir =
new RegionDirectory(bucketTargetingMap, indexForPR.getFileSystemStats());
IndexWriterConfig config = new IndexWriterConfig(indexForPR.getAnalyzer());
try {
attempts++;
return getIndexWriter(dir, config);
} catch (IOException e) {
if (attempts >= GET_INDEX_WRITER_MAX_ATTEMPTS) {
throw e;
}
logger.info("Encountered {} while attempting to get IndexWriter for index {}. Retrying...",
e, indexForPR.getName());
try {
Thread.sleep(5);
} catch (InterruptedException ignore) {
}
}
}
}
// bucketTargetingMap handles partition resolver (via bucketId as callbackArg)
Map bucketTargetingMap = getBucketTargetingMap(fileAndChunkBucket, bucketId);
RegionDirectory dir = new RegionDirectory(bucketTargetingMap, indexForPR.getFileSystemStats());
IndexWriterConfig config = new IndexWriterConfig(indexForPR.getAnalyzer());

protected IndexWriter getIndexWriter(RegionDirectory dir, IndexWriterConfig config)
throws IOException {
return new IndexWriter(dir, config);
}

Expand Down Expand Up @@ -211,8 +186,8 @@ private Object getValue(Region.Entry entry) {
return value;
}

protected Map<Object, Object> getBucketTargetingMap(BucketRegion region, int bucketId) {
return new BucketTargetingMap<>(region, bucketId);
protected Map getBucketTargetingMap(BucketRegion region, int bucketId) {
return new BucketTargetingMap(region, bucketId);
}

protected String getLockName(final BucketRegion fileAndChunkBucket) {
Expand Down
Expand Up @@ -14,7 +14,6 @@
*/
package org.apache.geode.cache.lucene.internal;

import static org.apache.geode.cache.lucene.internal.IndexRepositoryFactory.GET_INDEX_WRITER_MAX_ATTEMPTS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -23,13 +22,11 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;

import org.apache.lucene.index.IndexWriter;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -80,8 +77,7 @@ public void setUp() {
}

@Test
public void finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNull()
throws IOException {
public void finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNull() {
doReturn(null).when(indexRepositoryFactory).getMatchingBucket(fileRegion, bucketId);

IndexRepository indexRepository = indexRepositoryFactory.finishComputingRepository(0,
Expand All @@ -91,8 +87,7 @@ public void finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFi
}

@Test
public void finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNotPrimary()
throws IOException {
public void finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNotPrimary() {
when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(false);

IndexRepository indexRepository = indexRepositoryFactory.finishComputingRepository(0,
Expand All @@ -102,8 +97,7 @@ public void finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFi
}

@Test
public void finishComputingRepositoryShouldReturnOldRepositoryWhenNotNullAndNotClosed()
throws IOException {
public void finishComputingRepositoryShouldReturnOldRepositoryWhenNotNullAndNotClosed() {
when(oldRepository.isClosed()).thenReturn(false);
when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(true);

Expand All @@ -114,8 +108,7 @@ public void finishComputingRepositoryShouldReturnOldRepositoryWhenNotNullAndNotC
}

@Test
public void finishComputingRepositoryShouldReturnNullWhenLockCanNotBeAcquiredAndFileAndChunkBucketIsNotPrimary()
throws IOException {
public void finishComputingRepositoryShouldReturnNullWhenLockCanNotBeAcquiredAndFileAndChunkBucketIsNotPrimary() {
when(oldRepository.isClosed()).thenReturn(true);
when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(true).thenReturn(false);
when(distributedLockService.lock(any(), anyLong(), anyLong())).thenReturn(false);
Expand All @@ -126,16 +119,17 @@ public void finishComputingRepositoryShouldReturnNullWhenLockCanNotBeAcquiredAnd
}

@Test
public void finishComputingRepositoryShouldThrowExceptionAndReleaseLockWhenIOExceptionIsThrownWhileBuildingTheIndex()
public void finishComputingRepositoryShouldReturnNullAndReleaseLockWhenIOExceptionIsThrownWhileBuildingTheIndex()
throws IOException {
when(oldRepository.isClosed()).thenReturn(true);
when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(true);
when(distributedLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
doThrow(new IOException("Test Exception")).when(indexRepositoryFactory)
.buildIndexWriter(bucketId, fileAndChunkBucket, luceneIndex);

assertThatThrownBy(() -> indexRepositoryFactory.finishComputingRepository(0,
serializer, userRegion, oldRepository, luceneIndex)).isInstanceOf(IOException.class);
IndexRepository indexRepository = indexRepositoryFactory.finishComputingRepository(0,
serializer, userRegion, oldRepository, luceneIndex);
assertThat(indexRepository).isNull();
verify(distributedLockService).unlock(any());
}

Expand All @@ -152,27 +146,4 @@ public void finishComputingRepositoryShouldThrowExceptionAndReleaseLockWhenCache
userRegion, oldRepository, luceneIndex)).isInstanceOf(CacheClosedException.class);
verify(distributedLockService).unlock(any());
}

@Test
public void buildIndexWriterRetriesCreatingIndexWriterWhenIOExceptionEncountered()
throws IOException {
IndexWriter writer = mock(IndexWriter.class);
doThrow(new IOException()).doReturn(writer).when(indexRepositoryFactory).getIndexWriter(any(),
any());
assertThat(indexRepositoryFactory.buildIndexWriter(bucketId, fileAndChunkBucket, luceneIndex))
.isEqualTo(writer);
verify(indexRepositoryFactory, times(2)).getIndexWriter(any(), any());
}

@Test
public void buildIndexWriterThrowsExceptionWhenIOExceptionConsistentlyEncountered()
throws IOException {
IOException testException = new IOException("Test exception");
doThrow(testException).when(indexRepositoryFactory).getIndexWriter(any(), any());
assertThatThrownBy(
() -> indexRepositoryFactory.buildIndexWriter(bucketId, fileAndChunkBucket, luceneIndex))
.isEqualTo(testException);
verify(indexRepositoryFactory, times(GET_INDEX_WRITER_MAX_ATTEMPTS)).getIndexWriter(any(),
any());
}
}