Skip to content

Commit

Permalink
Merge pull request #4059 from alrossi/fix/4.0/resilient-inaccessible-…
Browse files Browse the repository at this point in the history
…file-accounting

dcache-resilience: improve inaccessible file accounting
  • Loading branch information
Jürgen Starek committed Jul 3, 2018
2 parents 36bc55c + 437ac98 commit 9148c9c
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 24 deletions.
Expand Up @@ -64,7 +64,9 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
Expand Down Expand Up @@ -839,34 +841,87 @@ protected String doCall() throws Exception {
@Command(name = "inaccessible",
hint = "list pnfsids for a pool which "
+ "currently have no readable locations",
description = "Issues a query to the namespace to scan the pool, "
description = "With no options, issues a query to the "
+ "namespace to scan the pool, "
+ "checking locations of each file with online "
+ "access latency; results are written to a "
+ "file in resilience home named '"
+ INACCESSIBLE_PREFIX
+ "' + pool. Executed asynchronously.")
+ "' + pool. Executed asynchronously. Using "
+ "the options, the scan status can be checked, "
+ "scan canceled, and contents of file "
+ "(pnfsid listing) displayed for single pools.")
class InaccessibleFilesCommand extends ResilienceCommand {
@Option(name = "cancel", usage = "Cancel the running job.")
@Option(name = "status", usage = "Check status of scan.")
boolean status = false;

@Option(name = "list", usage = "List the inaccessible pnfsids.")
boolean list = false;

@Option(name = "delete", usage = "Delete scan file.")
boolean delete = false;

@Option(name = "cancel", usage = "Cancel the running scan job.")
boolean cancel = false;

@Argument(usage = "A regular expression for pool names.")
String expression;
@Argument(usage = "With run and cancel, this can be a regular expression "
+ "for pool names; with the other options, it must be "
+ "a single pool name.")
String poolExpression;

@Override
protected String doCall() throws Exception {
if (status) {
return getStatus();
}

if (delete) {
return doDelete();
}

if (list) {
return getListing();
}

return doScan();
}

private String getStatus() {
if (futureMap.containsKey(poolExpression)) {
return "RUNNING";
}

if (getListingFile(poolExpression, resilienceDir).exists()) {
return "DONE";
}

return "NOT FOUND";
}

private String doDelete() {
File toDelete = getListingFile(poolExpression, resilienceDir);
if (toDelete.exists()) {
toDelete.delete();
return "Deleted " + toDelete;
}

return "Not found: " + toDelete;
}

private String doScan() {
try {
StringBuilder builder = new StringBuilder();
Pattern pattern = Pattern.compile(expression);
Pattern pattern = Pattern.compile(poolExpression);

poolInfoMap.getResilientPools()
.stream()
.filter((pool) -> pattern.matcher(pool).find())
.forEach((pool) -> handleOption(cancel, pool, builder));

builder.insert(0, "Started jobs to write the lists "
+ "of inaccessible pnfsids "
+ "to the following files:\n\n");
builder.append("Check pinboard for progress.\n");
if (!cancel) {
builder.insert(0, "Writing inaccessible pnfsids "
+ "to the following files:\n\n");
}

return builder.toString();
} catch (Exception e) {
Expand All @@ -881,21 +936,22 @@ private void handleOption(boolean cancel,
Future<?> future = futureMap.remove(pool);
if (future != null) {
future.cancel(true);
builder.append("Cancelled job for ")
.append(pool).append("\n");
} else {
builder.append("No running job for ")
.append(pool).append("\n");
}

builder.append("cancelled job for ")
.append(pool).append("\n");
} else {
File file = printToFile(pool,
resilienceDir);
File file = printToFile(pool, resilienceDir);
builder.append(" ")
.append(file.getAbsolutePath())
.append("\n");
}
}

private File printToFile(String pool, String dir) {
File file = new File(dir, INACCESSIBLE_PREFIX + pool);
File file = getListingFile(pool, dir);
ListeningExecutorService decoratedExecutor
= MoreExecutors.listeningDecorator(executor);

Expand Down Expand Up @@ -924,6 +980,25 @@ private File printToFile(String pool, String dir) {
MoreExecutors.directExecutor());
return file;
}

private String getListing() {
File file = getListingFile(poolExpression, resilienceDir);
if (!file.exists()) {
return "There is no current listing for " + poolExpression;
}
StringBuilder builder = new StringBuilder();
try (BufferedReader reader = new BufferedReader(
new FileReader(getListingFile(poolExpression, resilienceDir)))) {
reader.lines().forEach((l) -> builder.append(l).append("\n"));
} catch (IOException e) {
return "Trouble reading file for " + poolExpression + ": " + e.getMessage();
}
return builder.toString();
}

private File getListingFile(String pool, String dir) {
return new File(dir, INACCESSIBLE_PREFIX + pool);
}
}

@Command(name = "pool ctrl",
Expand Down
Expand Up @@ -396,6 +396,7 @@ private void postProcess(FileOperation operation) {
.map(poolInfoMap::getPool)
.collect(Collectors.toSet());
completionHandler.taskAborted(operation.getPnfsId(),
pool,
poolInfoMap.getUnit(operation.getStorageUnit()),
tried,
operation.getRetried(),
Expand Down
Expand Up @@ -68,7 +68,9 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.stream.Collectors;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;

import diskCacheV111.namespace.NameSpaceProvider;
import diskCacheV111.util.CacheException;
Expand Down Expand Up @@ -290,23 +292,37 @@ private void printResults(Connection connection,
throw new InterruptedException();
}

LOGGER.info("executing {}.", statement);
resultSet = statement.executeQuery();

LOGGER.info("starting check of pnfsids for {}.", location);

while (resultSet.next()) {
if (Thread.interrupted()) {
throw new InterruptedException();
}

PnfsId pnfsId = new PnfsId(resultSet.getString(1));
try {
if (getRequiredAttributes(pnfsId).getLocations().stream()
.map(poolInfoMap::getPoolIndex)
.filter((i) -> poolInfoMap.isPoolViable(i, false))
.collect(Collectors.toList()).isEmpty()) {
FileAttributes attributes = getRequiredAttributes(pnfsId);
Collection<String> locations = attributes.getLocations();
for (Iterator<String> i = locations.iterator(); i.hasNext();) {
String pool = i.next();
try {
int index = poolInfoMap.getPoolIndex(pool);
if (!poolInfoMap.isPoolViable(index, false)) {
i.remove();
}
} catch (NoSuchElementException e) {
i.remove();
}
}
if (locations.isEmpty()) {
writer.println(pnfsId);
LOGGER.info("orphaned: {}.", pnfsId);
}
} catch (CacheException e) {
LOGGER.debug("{}: {}", pnfsId, new ExceptionMessage(e));
LOGGER.info("{}: {}", pnfsId, new ExceptionMessage(e));
}
}
} finally {
Expand Down
Expand Up @@ -82,7 +82,8 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
*/
public final class FileTaskCompletionHandler implements TaskCompletionHandler {
static final String ABORT_REPLICATION_LOG_MESSAGE
= "Storage unit {}: aborted replication for {}; pools tried: {}; {}";
= "Storage unit {}: aborted replication for {}; "
+ "referring pool {}; pools tried: {}; {}";

static final String ABORT_REPLICATION_ALARM_MESSAGE
= "There are files in storage unit {} for which replication "
Expand Down Expand Up @@ -112,6 +113,7 @@ public void setMap(FileOperationMap map) {
}

public void taskAborted(PnfsId pnfsId,
String pool,
String storageUnit,
Set<String> triedSources,
int retried,
Expand Down Expand Up @@ -144,7 +146,8 @@ public void taskAborted(PnfsId pnfsId,
* Full info on the file is logged to the ".resilience" log.
*/
ABORTED_LOGGER.error(ABORT_REPLICATION_LOG_MESSAGE, storageUnit, pnfsId,
triedSources, new ExceptionMessage(e));
pool == null ? "none" : pool, triedSources,
new ExceptionMessage(e));
}

@Override
Expand Down

0 comments on commit 9148c9c

Please sign in to comment.