Skip to content

Commit

Permalink
[TRANSLOG] Fail #snapshot if translog is closed
Browse files Browse the repository at this point in the history
If the translog is closed while a snapshot opertion is in progress
we must fail the snapshot operation otherwise we end up in an endless
loop.

Closes elastic#10807

Conflicts:
	src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java
  • Loading branch information
s1monw committed Apr 26, 2015
1 parent 46a5350 commit b482992
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 14 deletions.
35 changes: 21 additions & 14 deletions src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java
Expand Up @@ -39,14 +39,14 @@
import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.index.translog.*;

import java.io.File;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.file.Path;
import java.util.concurrent.ThreadLocalRandom;
import java.nio.file.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand Down Expand Up @@ -86,6 +86,8 @@ public void onRefreshSettings(Settings settings) {

private final ApplySettings applySettings = new ApplySettings();

private final AtomicBoolean closed = new AtomicBoolean(false);

@Inject
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService,
BigArrays bigArrays, IndexStore indexStore) throws IOException {
Expand Down Expand Up @@ -146,21 +148,23 @@ public void updateBuffer(ByteSizeValue bufferSize) {
}

private void close(boolean delete) {
if (indexSettingsService != null) {
indexSettingsService.removeListener(applySettings);
}
rwl.writeLock().lock();
try {
FsTranslogFile current1 = this.current;
if (current1 != null) {
current1.close(delete);
if (closed.compareAndSet(false, true)) {
if (indexSettingsService != null) {
indexSettingsService.removeListener(applySettings);
}
current1 = this.trans;
if (current1 != null) {
current1.close(delete);
rwl.writeLock().lock();
try {
FsTranslogFile current1 = this.current;
if (current1 != null) {
current1.close(delete);
}
current1 = this.trans;
if (current1 != null) {
current1.close(delete);
}
} finally {
rwl.writeLock().unlock();
}
} finally {
rwl.writeLock().unlock();
}
}

Expand Down Expand Up @@ -413,6 +417,9 @@ public Location add(Operation operation) throws TranslogException {
@Override
public FsChannelSnapshot snapshot() throws TranslogException {
while (true) {
if (closed.get()) {
throw new TranslogException(shardId, "translog is already closed");
}
FsChannelSnapshot snapshot = current.snapshot();
if (snapshot != null) {
return snapshot;
Expand Down
Expand Up @@ -311,6 +311,17 @@ public void testSnapshotWithNewTranslog() {
snapshot.close();
}

public void testSnapshotOnClosedTranslog() throws IOException {
translog.add(new Translog.Create("test", "1", new byte[]{1}));
translog.close();
try {
Translog.Snapshot snapshot = translog.snapshot();
fail("translog is closed");
} catch (TranslogException ex) {
assertEquals(ex.getMessage(), "[index][1] translog is already closed");
}
}

@Test
public void testSnapshotWithSeekTo() {
Translog.Snapshot snapshot = translog.snapshot();
Expand Down

0 comments on commit b482992

Please sign in to comment.