Skip to content

Commit

Permalink
pool: sort flush queue only if we going to use the result
Browse files Browse the repository at this point in the history
Motivation:
Even if flush number of active flushes is configured to be zero, the
StorageClassInfo#internalFlush will sort the requests, but discard the
result.

Modification:
Update StorageClassInfo#internalFlush to sort the flush queue right before
the sorted result is needed. Use Java8 stream to build the sorted sublist.

Result:
Reduce the number of useless CPU intensive operations and lock starvation.

Target: master, 7.0, 6.2
Acked-by: Lea Morschel
Acked-by: Paul Millar
  • Loading branch information
kofemann committed Dec 14, 2020
1 parent 0a82fd1 commit 91cfaa5
Showing 1 changed file with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.dcache.pool.classic;

import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Ordering;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -11,11 +10,11 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import diskCacheV111.pools.StorageClassFlushInfo;
import diskCacheV111.util.CacheException;
Expand All @@ -24,7 +23,6 @@
import org.dcache.pool.nearline.NearlineStorageHandler;
import org.dcache.pool.repository.CacheEntry;

import static com.google.common.collect.Iterables.transform;
import static java.util.Collections.min;
import static java.util.Collections.singleton;
import static java.util.Comparator.comparingLong;
Expand Down Expand Up @@ -230,8 +228,7 @@ private synchronized Runnable internalFlush(long id, int maxCount,

_maxRequests = maxCount;

List<Entry> entries = Ordering.natural().sortedCopy(_requests.values());
maxCount = Math.min(entries.size(), maxCount);
maxCount = Math.min(_requests.size(), maxCount);

_isDraining = false;
_errorCounter = 0;
Expand All @@ -242,7 +239,13 @@ private synchronized Runnable internalFlush(long id, int maxCount,
if (maxCount != 0) {
_callback = callback;
_callbackExecutor = executor;
_storageHandler.flush(_hsmName, transform(entries.subList(0, maxCount), Entry::pnfsId), this);
_storageHandler.flush(_hsmName,
_requests.values().stream()
.sorted()
.map(Entry::pnfsId)
.limit(maxCount)
.collect(Collectors.toList()),
this);
} else if (callback != null) {
CallbackTask task = new CallbackTask(_hsmName, _storageClass, 0, id, 0, callback);
return () -> executor.execute(task);
Expand Down

0 comments on commit 91cfaa5

Please sign in to comment.