Skip to content

Commit

Permalink
Allow double-closing of FSTranslog
Browse files Browse the repository at this point in the history
the translog might be reused across engines which is currently a problem
in the design such that we have to allow calls to `close` more than once.
This moves the closed check for snapshot on the actual file to exit the loop.

Relates to #10807
  • Loading branch information
s1monw committed Apr 26, 2015
1 parent 71445ec commit aebaa1c
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 23 deletions.
Expand Up @@ -238,6 +238,11 @@ public void updateBufferSize(int bufferSize) {
}
}

@Override
public boolean closed() {
return this.closed.get();
}

class WrapperOutputStream extends OutputStream {

@Override
Expand Down
41 changes: 19 additions & 22 deletions src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java
Expand Up @@ -45,7 +45,6 @@
import java.nio.channels.ClosedChannelException;
import java.nio.file.Path;
import java.nio.file.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand Down Expand Up @@ -85,8 +84,6 @@ public void onRefreshSettings(Settings settings) {

private final ApplySettings applySettings = new ApplySettings();

private final AtomicBoolean closed = new AtomicBoolean(false);

@Inject
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService,
BigArrays bigArrays, IndexStore indexStore) throws IOException {
Expand Down Expand Up @@ -147,23 +144,21 @@ public void updateBuffer(ByteSizeValue bufferSize) {
}

private void close(boolean delete) {
if (closed.compareAndSet(false, true)) {
if (indexSettingsService != null) {
indexSettingsService.removeListener(applySettings);
if (indexSettingsService != null) {
indexSettingsService.removeListener(applySettings);
}
rwl.writeLock().lock();
try {
FsTranslogFile current1 = this.current;
if (current1 != null) {
current1.close(delete);
}
rwl.writeLock().lock();
try {
FsTranslogFile current1 = this.current;
if (current1 != null) {
current1.close(delete);
}
current1 = this.trans;
if (current1 != null) {
current1.close(delete);
}
} finally {
rwl.writeLock().unlock();
current1 = this.trans;
if (current1 != null) {
current1.close(delete);
}
} finally {
rwl.writeLock().unlock();
}
}

Expand Down Expand Up @@ -405,20 +400,22 @@ public Location add(Operation operation) throws TranslogException {
rwl.readLock().unlock();
if (!released && out != null) {
Releasables.close(out.bytes());
}

}
}

@Override
public FsChannelSnapshot snapshot() throws TranslogException {
while (true) {
if (closed.get()) {
throw new TranslogException(shardId, "translog is already closed");
}
FsTranslogFile current = this.current;
FsChannelSnapshot snapshot = current.snapshot();
if (snapshot != null) {
return snapshot;
}
if (current.closed() && this.current == current) {
// check if we are closed and if we are still current - then this translog is closed and we can exit
throw new TranslogException(shardId, "current translog is already closed");
}
Thread.yield();
}
}
Expand Down
Expand Up @@ -80,4 +80,6 @@ public static Type fromString(String type) throws ElasticsearchIllegalArgumentEx
boolean syncNeeded();

TranslogStream getStream();

public boolean closed();
}
Expand Up @@ -182,4 +182,9 @@ public String toString() {
", operationCounter=" + operationCounter +
'}';
}

public boolean closed() {
return this.closed.get();
}

}
Expand Up @@ -318,7 +318,7 @@ public void testSnapshotOnClosedTranslog() throws IOException {
Translog.Snapshot snapshot = translog.snapshot();
fail("translog is closed");
} catch (TranslogException ex) {
assertEquals(ex.getMessage(), "[index][1] translog is already closed");
assertEquals(ex.getMessage(), "[index][1] current translog is already closed");
}
}

Expand Down

0 comments on commit aebaa1c

Please sign in to comment.