Skip to content

Commit

Permalink
poolV4: fix reader/writer counts
Browse files Browse the repository at this point in the history
Motivation:

Bug, GitHub #5870
frontend/api: readers and writers always zero

Reporting of readers and writers has two issues:
- only the queued movers are counted for named queues;
- these are counted on the basis of IOPriority.

Modification:

Look at the open mode of the file in mover to determine
whether this is read or write; scan all jobs, not just
those that have been queued.

Result:

Counts of readers and writers should now be correct.

Target: master
Request: 7.1
Request: 7.0
Request: 6.2
Request: 6.1
Request: 6.0
Request: 5.2
Patch: https://rb.dcache.org/r/13009/
Closes: #5870
Requires-notes: yes
Requires-book: no
Acked-by: Dmitry
  • Loading branch information
alrossi authored and mksahakyan committed May 4, 2021
1 parent ef40d0e commit d2ff64f
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package org.dcache.pool.classic;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import diskCacheV111.pools.PoolCostInfo.NamedPoolQueueInfo;
import diskCacheV111.util.CacheException;
import diskCacheV111.util.DiskErrorCacheException;
import diskCacheV111.vehicles.IoJobInfo;
import diskCacheV111.vehicles.JobInfo;
import diskCacheV111.vehicles.ProtocolInfo;
import dmg.cells.nucleus.CDC;
import java.io.InterruptedIOException;
import java.nio.channels.CompletionHandler;
import java.util.ArrayList;
Expand All @@ -21,27 +25,18 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.annotation.Nullable;

import java.net.InetSocketAddress;

import dmg.cells.nucleus.CDC;

import diskCacheV111.util.CacheException;
import diskCacheV111.util.DiskErrorCacheException;
import diskCacheV111.vehicles.IoJobInfo;
import diskCacheV111.vehicles.JobInfo;
import diskCacheV111.vehicles.ProtocolInfo;

import org.dcache.pool.FaultAction;
import org.dcache.pool.FaultEvent;
import org.dcache.pool.FaultListener;
import org.dcache.pool.movers.Mover;
import org.dcache.pool.movers.json.MoverData;
import org.dcache.pool.repository.FileStore;
import org.dcache.util.AdjustableSemaphore;
import org.dcache.util.IoPrioritizable;
import org.dcache.util.IoPriority;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -387,7 +382,9 @@ public int getMaxActiveJobs()
*/
public void setMaxActiveJobs(int maxJobs)
{
_semaphore.setMaxPermits(maxJobs);
synchronized (this) {
_semaphore.setMaxPermits(maxJobs);
}
PrioritizedRequest request;
while (_semaphore.tryAcquire() && (request = nextOrRelease()) != null) {
sendToExecution(request);
Expand All @@ -399,27 +396,28 @@ public void setMaxActiveJobs(int maxJobs)
*
* @return number of pending requests.
*/
public int getQueueSize()
public synchronized int getQueueSize()
{
BlockingQueue<PrioritizedRequest> queue;
synchronized (this) {
queue = _queue;
}
return queue.size();
return _queue.size();
}

/**
* Get the number of write requests running or waiting to run.
* @return object containing queue name and statistics.
*/
public int getCountByPriority(IoPriority priority)
{
BlockingQueue<PrioritizedRequest> queue;
public NamedPoolQueueInfo getQueueInfo() {
int jobs;
int queued;
int writes;
int max_active;
synchronized (this) {
queue = _queue;
jobs = _jobs.size();
writes = (int) _jobs.values().stream().filter(PrioritizedRequest::isWrite).count();
queued = _queue.size();
max_active = _semaphore.getMaxPermits();
}
return (int) queue.stream()
.filter(r -> r.getPriority() == priority)
.count();
int active = jobs - queued;
int reads = jobs - writes;
return new NamedPoolQueueInfo(_name, active, max_active, queued, reads, writes);
}

/**
Expand Down Expand Up @@ -709,6 +707,16 @@ public long getCreateTime()
return _ctime;
}

public boolean isRead()
{
return _mover.getIoMode().equals(FileStore.O_READ);
}

public boolean isWrite()
{
return _mover.getIoMode().equals(FileStore.O_RW);
}

@Override
public int hashCode()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1858,12 +1858,7 @@ public String call() throws IllegalArgumentException

private static PoolCostInfo.NamedPoolQueueInfo toNamedPoolQueueInfo(MoverRequestScheduler queue)
{
return new PoolCostInfo.NamedPoolQueueInfo(queue.getName(),
queue.getActiveJobs(),
queue.getMaxActiveJobs(),
queue.getQueueSize(),
queue.getCountByPriority(IoPriority.REGULAR),
queue.getCountByPriority(IoPriority.HIGH));
return queue.getQueueInfo();
}

/**
Expand Down

0 comments on commit d2ff64f

Please sign in to comment.