Skip to content

Commit

Permalink
Fix Worker.ActiveClients is negative when load from ufs
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

Fix Worker.ActiveClients is negative when load from ufs
### Why are the changes needed?

Please clarify why the changes are needed. For instance,
createUfsBlockReader will invoke closeUfsBlock then invoke commitBlock
then this metric decrease.
But forget to increase firstly . So it need to increase firstly.

### Does this PR introduce any user facing changes?

Please list the user-facing changes introduced by your change, including
no.

pr-link: #16784
change-id: cid-13d0875dc42336cc612dfed121bba8572164ed60
  • Loading branch information
flaming-archer committed Feb 15, 2023
1 parent dc41fb1 commit 60d5156
Showing 1 changed file with 6 additions and 2 deletions.
Expand Up @@ -33,6 +33,7 @@
import alluxio.retry.RetryUtils;
import alluxio.underfs.UfsManager;
import alluxio.util.ThreadFactoryUtils;
import alluxio.worker.block.DefaultBlockWorker.Metrics;
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.io.BlockWriter;
import alluxio.worker.block.io.DelegatingBlockReader;
Expand Down Expand Up @@ -161,6 +162,7 @@ public BlockReader createBlockReader(long sessionId, long blockId, long offset,
Optional<? extends BlockMeta> blockMeta = mLocalBlockStore.getVolatileBlockMeta(blockId);
if (blockMeta.isPresent()) {
reader = mLocalBlockStore.createBlockReader(sessionId, blockId, offset);
DefaultBlockWorker.Metrics.WORKER_ACTIVE_CLIENTS.inc();
} else {
boolean checkUfs = options != null && (options.hasUfsPath() || options.getBlockInUfsTier());
if (!checkUfs) {
Expand All @@ -169,7 +171,6 @@ public BlockReader createBlockReader(long sessionId, long blockId, long offset,
// When the block does not exist in Alluxio but exists in UFS, try to open the UFS block.
reader = createUfsBlockReader(sessionId, blockId, offset, positionShort, options);
}
DefaultBlockWorker.Metrics.WORKER_ACTIVE_CLIENTS.inc();
return reader;
}

Expand All @@ -181,7 +182,10 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse
try {
BlockReader reader = mUnderFileSystemBlockStore.createBlockReader(sessionId, blockId, offset,
positionShort, options);
return new DelegatingBlockReader(reader, () -> closeUfsBlock(sessionId, blockId));
BlockReader blockReader = new DelegatingBlockReader(reader,
() -> closeUfsBlock(sessionId, blockId));
Metrics.WORKER_ACTIVE_CLIENTS.inc();
return blockReader;
} catch (Exception e) {
try {
closeUfsBlock(sessionId, blockId);
Expand Down

0 comments on commit 60d5156

Please sign in to comment.