Skip to content

Commit

Permalink
dcache-chimera: separate disk and hsm cleaner into new cells
Browse files Browse the repository at this point in the history
Motivation:
The disk and hsm parts of the `cleaner` cell currently share resources, which can lead to performance issues and unexpected behaviours. Both parts share the same logging space and it is not always clear which component is responsible for which output, making understanding the cleaner's behaviour even less transparent.

Modification:
Separate the disk and hsm cleaner components into new cells: `cleaner-disk` and `cleaner-hsm`. They can be deployed as desired, be assigned different resources and each run in HA mode.

Result:
The cleaner now consists of two parts: one for disk cleaning (`cleaner-disk`), one for hsm cleaning (`cleaner-hsm`).
This will hopefully improve performance issues and help admins configure and understand cleaner behaviour.

Target: master
Requires-notes: yes
Requires-book: yes
Patch: https://rb.dcache.org/r/13671/
Acked-by: Albert Rossi
Acked-by: Tigran Mkrtchyan
  • Loading branch information
lemora committed Sep 21, 2022
1 parent d33ed35 commit aa58f56
Show file tree
Hide file tree
Showing 11 changed files with 465 additions and 243 deletions.
Expand Up @@ -46,7 +46,6 @@
import java.util.function.BiFunction;
import org.dcache.cells.CellStub;
import org.dcache.util.CacheExceptionFactory;
import org.dcache.util.NDC;
import org.dcache.util.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -65,7 +64,6 @@
public class DiskCleaner extends AbstractCleaner implements CellCommandListener, CellInfoProvider {

private static final Logger LOGGER = LoggerFactory.getLogger(DiskCleaner.class);
private static final String CLEANER_TYPE = "Disk-Cleaner";

private final ConcurrentHashMap<String, Long> _poolsBlackList = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Long> _poolsBeingCleaned = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -104,7 +102,6 @@ public void setNotificationStub(CellStub stub) {
*/
@Override
protected void runDelete() throws InterruptedException {
NDC.push(CLEANER_TYPE);
try {
LOGGER.info("New run...");

Expand Down Expand Up @@ -147,8 +144,6 @@ protected void runDelete() throws InterruptedException {
LOGGER.info("Cleaner was interrupted");
} catch (RuntimeException e) {
LOGGER.error("Bug detected", e);
} finally {
NDC.pop();
}
}

Expand Down Expand Up @@ -191,14 +186,12 @@ private void runDelete(List<String> poolList) throws InterruptedException {
private void runDelete(String pool) {
if (!_poolsBlackList.containsKey(pool)) {
try {
LOGGER.info("{} now processing pool {}", CLEANER_TYPE, pool);
cleanPoolComplete(pool);
} catch (NoRouteToCellException | CacheException e) {
LOGGER.warn("{} failed to remove files from {}: {}", CLEANER_TYPE, pool,
LOGGER.warn("failed to remove files from pool {}: {}", pool,
e.getMessage());
} catch (InterruptedException e) {
LOGGER.warn("{} was interrupted while deleting files from pool {}: {}",
CLEANER_TYPE, pool,
LOGGER.warn("cleaner was interrupted while deleting files from pool {}: {}", pool,
e.getMessage());
}
}
Expand Down Expand Up @@ -232,7 +225,7 @@ int forgetTargetsOnPool(final String poolname) {
*/
void removeFiles(final String poolname, final List<String> filelist) {
if (filelist == null || filelist.isEmpty()) {
LOGGER.info("{}: Unexpected empty delete file list.", CLEANER_TYPE);
LOGGER.info("unexpected empty delete file list.");
return;
}
_db.batchUpdate(
Expand Down
Expand Up @@ -33,7 +33,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.dcache.util.NDC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Required;
Expand All @@ -53,7 +52,6 @@ public class HsmCleaner extends AbstractCleaner implements CellMessageReceiver,
CellInfoProvider {

private static final Logger LOGGER = LoggerFactory.getLogger(HsmCleaner.class);
private static final String CLEANER_TYPE = "HSM-Cleaner";

/**
* Utility class to keep track of timeouts.
Expand Down Expand Up @@ -185,16 +183,13 @@ public synchronized void setFailureSink(Consumer<URI> sink) {
* Called when a file was successfully deleted from the HSM.
*/
protected void onSuccess(URI uri) {
NDC.push(CLEANER_TYPE);
try {
LOGGER.debug("remove entries from the trash-table. ilocation={}",
uri);
_db.update("DELETE FROM t_locationinfo_trash WHERE ilocation=? AND itype=0",
uri.toString());
} catch (DataAccessException e) {
LOGGER.error("Error when deleting from the trash-table: {}", e.getMessage());
} finally {
NDC.pop();
}
}

Expand Down Expand Up @@ -259,8 +254,8 @@ private synchronized void flush(String hsm) {
PoolRemoveFilesFromHSMMessage message = new PoolRemoveFilesFromHSMMessage(name, hsm,
locations);

LOGGER.info("{} sending {} delete locations for hsm {} to pool {}", CLEANER_TYPE,
locations.size(), hsm, name);
LOGGER.info("sending {} delete locations for HSM {} to pool {}", locations.size(), hsm,
name);

_poolStub.notify(new CellPath(name), message);

Expand Down Expand Up @@ -306,7 +301,7 @@ public synchronized void messageArrived(PoolRemoveFilesFromHSMMessage msg) {
* entries.
*/
if (msg.getReturnCode() != 0) {
LOGGER.error("{} received failure from pool: {}", CLEANER_TYPE, msg.getErrorObject());
LOGGER.error("received failure from pool: {}", msg.getErrorObject());
return;
}

Expand All @@ -324,8 +319,7 @@ public synchronized void messageArrived(PoolRemoveFilesFromHSMMessage msg) {
* ignore it.
*/
LOGGER.warn(
"Received confirmation from a pool, for an action this {} did not request.",
CLEANER_TYPE);
"Received confirmation from a pool, for an action this cleaner did not request.");
return;
}

Expand Down Expand Up @@ -357,59 +351,54 @@ public synchronized void messageArrived(PoolRemoveFilesFromHSMMessage msg) {
*/
@Override
protected void runDelete() throws InterruptedException {
NDC.push(CLEANER_TYPE);
try {
LOGGER.info("New run...");

int locationsCached = _locationsToDelete.values().stream().map(Set::size)
.reduce(0, Integer::sum);
int queryLimit = _maxCachedDeleteLocations - locationsCached;

LOGGER.debug("Locations cached: {} (max cached: {}), query limit: {}, offset: {}",
locationsCached, _maxCachedDeleteLocations, queryLimit, _dbLastSeenTimestamp);

if (queryLimit <= 0) {
LOGGER.debug(
"The number of cached hsm locations is already the maximum permissible size. "
+
"Not adding further entries.");
_locationsToDelete.keySet().forEach(
this::flush); // avoid not processing the remaining requests and being stuck
return;
}
LOGGER.info("New run...");

int locationsCached = _locationsToDelete.values().stream().map(Set::size)
.reduce(0, Integer::sum);
int queryLimit = _maxCachedDeleteLocations - locationsCached;

LOGGER.debug("Locations cached: {} (max cached: {}), query limit: {}, offset: {}",
locationsCached, _maxCachedDeleteLocations, queryLimit, _dbLastSeenTimestamp);

if (queryLimit <= 0) {
LOGGER.debug(
"The number of cached HSM locations is already the maximum permissible size. "
+
"Not adding further entries.");
_locationsToDelete.keySet().forEach(
this::flush); // avoid not processing the remaining requests and being stuck
return;
}

AtomicInteger noRequestsCollected = new AtomicInteger();
Timestamp graceTime = Timestamp.from(
Instant.now().minusSeconds(_gracePeriod.getSeconds()));
AtomicInteger noRequestsCollected = new AtomicInteger();
Timestamp graceTime = Timestamp.from(
Instant.now().minusSeconds(_gracePeriod.getSeconds()));

_db.query(
"SELECT ilocation, ictime FROM t_locationinfo_trash WHERE itype=0 AND ictime<? AND ictime>? ORDER BY ictime ASC LIMIT ?",
rs -> {
try {
Preconditions.checkState(_hasHaLeadership,
"HA leadership was lost while reading from trashtable. Aborting operation.");
_db.query(
"SELECT ilocation, ictime FROM t_locationinfo_trash WHERE itype=0 AND ictime<? AND ictime>? ORDER BY ictime ASC LIMIT ?",
rs -> {
try {
Preconditions.checkState(_hasHaLeadership,
"HA leadership was lost while reading from trashtable. Aborting operation.");

URI uri = new URI(rs.getString("ilocation"));
submit(uri);
URI uri = new URI(rs.getString("ilocation"));
submit(uri);

Timestamp ctime = rs.getTimestamp("ictime");
_dbLastSeenTimestamp = ctime;
Timestamp ctime = rs.getTimestamp("ictime");
_dbLastSeenTimestamp = ctime;

noRequestsCollected.getAndIncrement();
noRequestsCollected.getAndIncrement();

} catch (URISyntaxException e) {
throw new DataIntegrityViolationException(
"Invalid URI in database: " + e.getMessage(), e);
}
},
graceTime, _dbLastSeenTimestamp, queryLimit);
} catch (URISyntaxException e) {
throw new DataIntegrityViolationException(
"Invalid URI in database: " + e.getMessage(), e);
}
},
graceTime, _dbLastSeenTimestamp, queryLimit);

if (_dbLastSeenTimestamp.getTime() != 0 && noRequestsCollected.get() < queryLimit) {
// We have reached the end of the database and should start at the beginning next run
_dbLastSeenTimestamp = new Timestamp(0);
}
} finally {
NDC.pop();
if (_dbLastSeenTimestamp.getTime() != 0 && noRequestsCollected.get() < queryLimit) {
// We have reached the end of the database and should start at the beginning next run
_dbLastSeenTimestamp = new Timestamp(0);
}
}

Expand Down Expand Up @@ -487,7 +476,7 @@ public String call() {
}
}

@Command(name = "hsm set maxCachedDeleteLocations",
@Command(name = "set maxCachedDeleteLocations",
hint = "Changes the maximum number of cached hsm delete locations.")
public class HsmSetMaxCachedDeleteLocationsCommand implements Callable<String> {

Expand All @@ -507,7 +496,7 @@ public String call() throws CommandException, IllegalArgumentException {
}
}

@Command(name = "hsm set maxFilesPerRequest",
@Command(name = "set maxFilesPerRequest",
hint = "Changes the number of files sent to a HSM instance for processing at once.")
public class HsmSetMaxFilesPerRequestCommand implements Callable<String> {

Expand All @@ -527,7 +516,7 @@ public String call() throws CommandException, IllegalArgumentException {
}
}

@Command(name = "hsm set timeOut",
@Command(name = "set timeOut",
hint = "Changes the timeout for delete requests sent to an HSM pool.")
public class HsmSetTimeOutCommand implements Callable<String> {

Expand Down Expand Up @@ -555,7 +544,7 @@ public String call() throws CommandException, IllegalArgumentException {

///// HSM admin commands /////

@Command(name = "rundelete hsm",
@Command(name = "rundelete",
hint = "Runs the HSM Cleaner.")
public class RundeleteHsmCommand implements Callable<String> {

Expand All @@ -568,7 +557,7 @@ public String call() throws InterruptedException, CommandException {
}

// explicitly clean HSM-file
@Command(name = "clean file hsm",
@Command(name = "clean file",
hint = "Clean this file on HSM (file will be deleted from HSM)")
public class CleanFileHsmCommand implements Callable<String> {

Expand Down

0 comments on commit aa58f56

Please sign in to comment.