Skip to content

Commit

Permalink
pool-repository: Improve metadata check speed on pool re-start.
Browse files Browse the repository at this point in the history
Motivation:

For large pools (> 1T, > 1M files), reading the repository and checking metadata can take
a long time when restarting dCache pools -- O (nbr. of entries). This leads to have
`smaller` pools which are more manageable in case of interventions (upgrade, reboot, etc).

Modification:
- Extend the replica repository class to scan the repository with a threads pool.
- Add the property: pool.limits.scan-threads(by default 1) to define the number of threads
dCache will use to scan the local replica repository of the pool.

Result:

For 706469 files, to check the metadata of each file in the repository it takes 3 minutes
with 1 threads(as today) and 34 seconds with 10 threads.

Acked-by:
Target: master, 5.2
Require-notes: yes
Require-book: no
Patch: https://rb.dcache.org/r/11928/
Committed: master@xxxxxx
Pull-request: https://github.com/dCache/dcache/pull/xxxx
Signed-off-by: Vincent Garonne vgaronne@gmail.com
  • Loading branch information
vingar committed Aug 29, 2019
1 parent 765e95f commit da929a0
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 17 deletions.
Expand Up @@ -7,6 +7,7 @@

import java.io.PrintWriter;
import java.nio.file.OpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
Expand All @@ -15,14 +16,24 @@
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import diskCacheV111.util.CacheException;
import diskCacheV111.util.DiskErrorCacheException;
Expand Down Expand Up @@ -136,6 +147,18 @@ public class ReplicaRepository
private final Set<PnfsId> _removable =
Collections.newSetFromMap(new ConcurrentHashMap<>());

/**
* Threads pool size to scan the repository to check metadata.
*/
private Integer scanThreads;

/**
* workQueue attributes to scan the repository to check metadata.
*/
private int _workQueueCapacity = 10000;
private long _workQueuekeepAliveTime = 60;
private TimeUnit _workQueueTimeUnit = TimeUnit.SECONDS;

/** Executor for periodic tasks. */
@GuardedBy("_stateLock")
private ScheduledExecutorService _executor;
Expand Down Expand Up @@ -259,6 +282,17 @@ public void setCellAddress(CellAddressCore address)
}
}

public Integer getscanThreads()
{
return scanThreads;
}

public void setScanThreads(Integer scanThreads)
{
this.scanThreads = scanThreads;
}


/**
* Get pool name to which repository belongs.
* @return pool name.
Expand Down Expand Up @@ -496,11 +530,25 @@ private boolean compareAndSetState(State expected, State state)
}
}

private PnfsId loadRecord(PnfsId id)
throws CacheException, IllegalStateException,
InterruptedException {
ReplicaRecord entry = readReplicaRecord(id);
if (entry != null) {
ReplicaState state = entry.getState();
LOGGER.debug("{} {}", id, state);
}
// Lazily check if repository was closed
if (_state != State.LOADING) {
throw new IllegalStateException("Repository was closed during loading.");
}
return id;
}

@Override
public void load()
throws CacheException, IllegalStateException,
InterruptedException
{
throws CacheException, IllegalStateException,
InterruptedException {
if (!compareAndSetState(State.INITIALIZED, State.LOADING)) {
throw new IllegalStateException("Can only load repository after initialization and only once.");
}
Expand All @@ -509,24 +557,56 @@ public void load()
_store.init();

Collection<PnfsId> ids = _store.index();

int fileCount = ids.size();
LOGGER.info("Checking meta data for {} files.", fileCount);

LOGGER.info("Checking meta data for {} files with {} threads.", fileCount, scanThreads);
int cnt = 0;
for (PnfsId id: ids) {
ReplicaRecord entry = readReplicaRecord(id);
if (entry != null) {
ReplicaState state = entry.getState();
LOGGER.debug("{} {}", id, state);
if (scanThreads == 1) {
for (PnfsId id : ids) {
loadRecord(id);
_initializationProgress = ((float) ++cnt) / fileCount;
}
_initializationProgress = ((float) cnt) / fileCount;
cnt++;
} else {
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(_workQueueCapacity);
ThreadPoolExecutor scanExecutor = new ThreadPoolExecutor(1, scanThreads, _workQueuekeepAliveTime, _workQueueTimeUnit, workQueue);
CompletionService<PnfsId> completionService = new ExecutorCompletionService<PnfsId>(scanExecutor);
Set<Future<PnfsId>> futures = new HashSet<Future<PnfsId>>();

for (PnfsId id : ids) {

ArrayList<Future<PnfsId>> completedFutures = new ArrayList<Future<PnfsId>>();
while (true) {
try {
futures.add(completionService.submit(() -> {
return loadRecord(id);
}));
break;
} catch (RejectedExecutionException e) {
completedFutures.add(completionService.take());
}
}

// Lazily check if repository was closed
if (_state != State.LOADING) {
throw new IllegalStateException("Repository was closed during loading.");
while (completedFutures.size() > 0 || (futures.size() + cnt == fileCount && futures.size() > 0)) {

Future<PnfsId> future = completionService.poll();
if (future != null) {
completedFutures.add(future);
}
if (completedFutures.size() > 0) {
future = completedFutures.remove(0);
futures.remove(future);
try {
future.get();
_initializationProgress = ((float) ++cnt) / fileCount;
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}
}
scanExecutor.shutdown();
}
LOGGER.debug("Checked meta data for {} % of the files.", _initializationProgress);

_stateLock.writeLock().lock();
try {
Expand Down
Expand Up @@ -148,6 +148,7 @@
value="#{ '${pool.lfs}' == 'volatile' or '${pool.lfs}' == 'transient' }"/>
<property name="maxDiskSpaceString" value="${pool.size}"/>
<property name="replicaStore" ref="replica-store"/>
<property name="scanThreads" value="${pool.limits.scan-threads}"/>
</bean>

<bean id="repository-interpreter" class="org.dcache.pool.repository.RepositoryInterpreter">
Expand Down
3 changes: 3 additions & 0 deletions skel/share/defaults/pool.properties
Expand Up @@ -119,6 +119,9 @@ pool.limits.worker-threads=5
# e.g. name space operations or callouts into installed nearline storage providers.
pool.limits.nearline-threads=30

# Worker thread pool to scan and check metadata from the pool repository.
pool.limits.scan-threads=1

# Pool cell name. Currently this has to be the same as the pool name.
pool.cell.name=${pool.name}

Expand Down

0 comments on commit da929a0

Please sign in to comment.