Skip to content

Commit

Permalink
Remove special rejection handling in Cold/Frozen cache services (#85775
Browse files Browse the repository at this point in the history
…) (#85780)

Now #85217 is merged the special handling introduced by #85239
to fail Gaps when a task is rejected can be removed. The
`onRejection` method of the AbstractRunnable task will be
executed if the task is rejected or if an unexpected exception
is thrown before the task is added to the thread pool executor
queue.
  • Loading branch information
tlrx committed Apr 11, 2022
1 parent ae5dfdd commit adaeea3
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -362,34 +362,28 @@ public Future<Integer> populateAndRead(
);

for (SparseFileTracker.Gap gap : gaps) {
try {
executor.execute(new AbstractRunnable() {

@Override
protected void doRun() throws Exception {
if (reference.tryIncRef() == false) {
throw new AlreadyClosedException("Cache file channel has been released and closed");
}
try {
ensureOpen();
writer.fillCacheRange(reference.fileChannel, gap.start(), gap.end(), gap::onProgress);
gap.onCompletion();
markAsNeedsFSync();
} finally {
reference.decRef();
}
}
executor.execute(new AbstractRunnable() {

@Override
public void onFailure(Exception e) {
gap.onFailure(e);
@Override
protected void doRun() throws Exception {
if (reference.tryIncRef() == false) {
throw new AlreadyClosedException("Cache file channel has been released and closed");
}
});
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("unexpected exception when submitting task to fill gap [{}]", gap), e);
assert false : e;
gap.onFailure(e);
}
try {
ensureOpen();
writer.fillCacheRange(reference.fileChannel, gap.start(), gap.end(), gap::onProgress);
gap.onCompletion();
markAsNeedsFSync();
} finally {
reference.decRef();
}
}

@Override
public void onFailure(Exception e) {
gap.onFailure(e);
}
});
}
} catch (Exception e) {
releaseAndFail(future, decrementRef, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Assertions;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -760,42 +759,36 @@ StepListener<Integer> populateAndRead(
final List<SparseFileTracker.Gap> gaps = tracker.waitForRange(rangeToWrite, rangeToRead, rangeListener);

for (SparseFileTracker.Gap gap : gaps) {
try {
executor.execute(new AbstractRunnable() {

@Override
protected void doRun() throws Exception {
if (CacheFileRegion.this.tryIncRef() == false) {
throw new AlreadyClosedException("Cache file channel has been released and closed");
}
try {
ensureOpen();
final long start = gap.start();
assert regionOwners[sharedBytesPos].get() == CacheFileRegion.this;
writer.fillCacheRange(
fileChannel,
physicalStartOffset() + gap.start(),
gap.start(),
gap.end() - gap.start(),
progress -> gap.onProgress(start + progress)
);
writeCount.increment();
} finally {
decRef();
}
gap.onCompletion();
}
executor.execute(new AbstractRunnable() {

@Override
public void onFailure(Exception e) {
gap.onFailure(e);
@Override
protected void doRun() throws Exception {
if (CacheFileRegion.this.tryIncRef() == false) {
throw new AlreadyClosedException("Cache file channel has been released and closed");
}
});
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("unexpected exception when submitting task to fill gap [{}]", gap), e);
assert false : e;
gap.onFailure(e);
}
try {
ensureOpen();
final long start = gap.start();
assert regionOwners[sharedBytesPos].get() == CacheFileRegion.this;
writer.fillCacheRange(
fileChannel,
physicalStartOffset() + gap.start(),
gap.start(),
gap.end() - gap.start(),
progress -> gap.onProgress(start + progress)
);
writeCount.increment();
} finally {
decRef();
}
gap.onCompletion();
}

@Override
public void onFailure(Exception e) {
gap.onFailure(e);
}
});
}
} catch (Exception e) {
releaseAndFail(listener, decrementRef, e);
Expand Down

0 comments on commit adaeea3

Please sign in to comment.