Skip to content

Commit

Permalink
Refactor the Translog.read(Location) method
Browse files Browse the repository at this point in the history
It was only used by `readSource`, it has been changed to return a
Translog.Operation, which can have .getSource() called on it to return
the source. `readSource` has been removed.

This also removes the checked IOException, any exception thrown is
unexpected and should throw a runtime exception.

Moves the ReleasableBytesStreamOutput allocation into the body of the
try-catch block so the lock can be released in the event of an exception
during allocation.
  • Loading branch information
dakrone committed Sep 29, 2014
1 parent 997b94b commit 168b375
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 57 deletions.
Expand Up @@ -358,13 +358,9 @@ public GetResult get(Get get) throws EngineException {
if (!get.loadSource()) {
return new GetResult(true, versionValue.version(), null);
}
try {
Translog.Source source = translog.readSource(versionValue.translogLocation());
if (source != null) {
return new GetResult(true, versionValue.version(), source);
}
} catch (IOException e) {
// switched on us, read it from the reader
Translog.Operation op = translog.read(versionValue.translogLocation());
if (op != null) {
return new GetResult(true, versionValue.version(), op.getSource());
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/org/elasticsearch/index/translog/Translog.java
Expand Up @@ -103,9 +103,7 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent,
*/
Location add(Operation operation) throws TranslogException;

byte[] read(Location location);

Translog.Source readSource(Location location) throws IOException;
Translog.Operation read(Location location);

/**
* Snapshots the current transaction log allowing to safely iterate over the snapshot.
Expand Down Expand Up @@ -154,6 +152,11 @@ public Location(long translogId, long translogLocation, int size) {
public long ramBytesUsed() {
return RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + 2*RamUsageEstimator.NUM_BYTES_LONG + RamUsageEstimator.NUM_BYTES_INT;
}

@Override
public String toString() {
return "[id: " + translogId + ", location: " + translogLocation + ", size: " + size + "]";
}
}

/**
Expand Down
46 changes: 16 additions & 30 deletions src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java
Expand Up @@ -328,51 +328,37 @@ private FsTranslogFile translogForLocation(Location location) {
}

/**
* Private read method that reads from either the transient translog (if
* applicable), or the current translog. Acquires the read lock
* before reading.
* @return byte array of read data
* Read the Operation object from the given location, returns null if the
* Operation could not be read.
*/
public byte[] read(Location location) {
@Override
public Translog.Operation read(Location location) {
rwl.readLock().lock();
try {
FsTranslogFile trans = translogForLocation(location);
if (trans != null) {
try {
return trans.read(location);
} catch (Exception e) {
// ignore
FsTranslogFile translog = translogForLocation(location);
if (translog != null) {
byte[] data = translog.read(location);
try (BytesStreamInput in = new BytesStreamInput(data, false)) {
// Return the Operation using the current version of the
// stream based on which translog is being read
return translog.getStream().read(in);
}
}
return null;
} catch (IOException e) {
throw new ElasticsearchException("failed to read source from traslog location " + location, e);
} finally {
rwl.readLock().unlock();
}
}

/**
* Read the Source object from the given location, returns null if the
* source could not be read.
*/
@Override
public Source readSource(Location location) throws IOException {
byte[] data = this.read(location);
if (data == null) {
return null;
}
// Return the source using the current version of the stream based on
// which translog is being read
try (BytesStreamInput in = new BytesStreamInput(data, false)) {
return this.translogForLocation(location).getStream().read(in).getSource();
}
}

@Override
public Location add(Operation operation) throws TranslogException {
rwl.readLock().lock();
ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
boolean released = false;
ReleasableBytesStreamOutput out = null;
try {
out = new ReleasableBytesStreamOutput(bigArrays);
TranslogStreams.writeTranslogOperation(out, operation);
ReleasableBytesReference bytes = out.bytes();
Location location = current.add(bytes);
Expand All @@ -397,7 +383,7 @@ public Location add(Operation operation) throws TranslogException {
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
} finally {
rwl.readLock().unlock();
if (!released) {
if (!released && out != null) {
Releasables.close(out.bytes());
}
}
Expand Down
Expand Up @@ -23,8 +23,6 @@
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -74,24 +72,19 @@ public void tearDown() throws Exception {

protected abstract String translogFileDirectory();

private static Translog.Source readSource(byte[] bytes) throws IOException {
BytesStreamInput in = new BytesStreamInput(bytes, false);
return TranslogStreams.readTranslogOperation(in).getSource();
}

@Test
public void testRead() throws IOException {
Translog.Location loc1 = translog.add(new Translog.Create("test", "1", new byte[]{1}));
Translog.Location loc2 = translog.add(new Translog.Create("test", "2", new byte[]{2}));
assertThat(readSource(translog.read(loc1)).source.toBytesArray(), equalTo(new BytesArray(new byte[]{1})));
assertThat(readSource(translog.read(loc2)).source.toBytesArray(), equalTo(new BytesArray(new byte[]{2})));
assertThat(translog.read(loc1).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{1})));
assertThat(translog.read(loc2).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{2})));
translog.sync();
assertThat(translog.readSource(loc1).source.toBytesArray(), equalTo(new BytesArray(new byte[]{1})));
assertThat(translog.readSource(loc2).source.toBytesArray(), equalTo(new BytesArray(new byte[]{2})));
assertThat(translog.read(loc1).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{1})));
assertThat(translog.read(loc2).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{2})));
Translog.Location loc3 = translog.add(new Translog.Create("test", "2", new byte[]{3}));
assertThat(readSource(translog.read(loc3)).source.toBytesArray(), equalTo(new BytesArray(new byte[]{3})));
assertThat(translog.read(loc3).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{3})));
translog.sync();
assertThat(translog.readSource(loc3).source.toBytesArray(), equalTo(new BytesArray(new byte[]{3})));
assertThat(translog.read(loc3).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{3})));
}

@Test
Expand Down Expand Up @@ -368,9 +361,7 @@ public void run() {
}

for (LocationOperation locationOperation : writtenOperations) {
byte[] data = translog.read(locationOperation.location);
StreamInput streamInput = new BytesStreamInput(data, false);
Translog.Operation op = TranslogStreams.readTranslogOperation(streamInput);
Translog.Operation op = translog.read(locationOperation.location);
Translog.Operation expectedOp = locationOperation.operation;
assertEquals(expectedOp.opType(), op.opType());
switch (op.opType()) {
Expand Down Expand Up @@ -434,7 +425,7 @@ public void testTranslogChecksums() throws Exception {
AtomicInteger corruptionsCaught = new AtomicInteger(0);
for (Translog.Location location : locations) {
try {
readSource(translog.read(location));
translog.read(location);
} catch (TranslogCorruptedException e) {
corruptionsCaught.incrementAndGet();
}
Expand Down

0 comments on commit 168b375

Please sign in to comment.