Skip to content

Commit

Permalink
resilience: do simple existence check of replica on pool to avoid dar…
Browse files Browse the repository at this point in the history
…k removes

Motivation:

A recent experience with data loss from removes during
(more than likely) multiple concurrent "pool up" scans
suggested that relying on the namespace for replica
locations is inherently subject to a race between
resilience and the pool clear cache location message.

Testing has confirmed this.
See issue #4742

Modification:

Add a SpreadAndWait of the PoolCheckFileMessage to
each of the readable pool locations and return the
pools on which a replica for that pnfsid is found.

This is done for both copy and remove.

An alarm at the warning level is issued at most once
every 15 minutes with new alarms keyed to hourly
timestamp, when the lag or inconsistency is detected.

Missing files (no replicas and no tape) continue
to be logged to activity logger (.resilience file),
just as with inaccessible files, whenever they
are detected.

A small correctness adjustment has been made to
the method which determines the operation type
(in order to handle "excluded" pools correctly).

Junit testing has been modified to simulate
the new messaging (always returning true
for file existence, in order to maintain
consistency with existing tests).

Result:

Dark removes which can result in the
removal of all replicas for a given file
are prevented.

Target: master
Request: 5.0
Request: 4.2
Request: 4.1
Request: 4.0
Requires-notes: yes
Requires-book: no
Acked-by: Tigran
  • Loading branch information
alrossi committed Mar 27, 2019
1 parent a3160b9 commit bc1e5a2
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 28 deletions.
Expand Up @@ -80,8 +80,10 @@ public enum PredefinedAlarm implements Alarm {
BROKEN_FILE,
CHECKSUM,
INACCESSIBLE_FILE,
LOST_RESILIENT_FILE,
FAILED_REPLICATION,
RESILIENCE_SYNC_FAILURE;
RESILIENCE_PM_SYNC_FAILURE,
RESILIENCE_LOC_SYNC_ISSUE;

@Override
public String getType() {
Expand Down
Expand Up @@ -375,6 +375,13 @@ public boolean validateForAction(Integer storageUnit,
* Countable means readable OR intentionally excluded locations.
* If there are copies missing only from excluded locations,
* do nothing.
*
* NOTE. An initial check for consistency with the pools is
* avoided here so as not to block this thread on a
* send and wait. The locations will be reverified as
* part of the operation logic. While this means
* operations could unnecessarily be created, it
* ensures more efficient use of the thread resources.
*/
int countable = poolInfoMap.getCountableLocations(locations);
count = required - countable;
Expand Down
Expand Up @@ -75,6 +75,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
Expand Down Expand Up @@ -350,6 +351,22 @@ public int getCountableLocations(Collection<String> locations) {
return countable;
}

public Set<String> getExcludedLocationNames(Collection<String> members) {
read.lock();
try {
return members.stream()
.map(l -> poolInfo.get(getPoolIndex(l)))
.filter(Objects::nonNull)
.filter(PoolInformation::isInitialized)
.filter(PoolInformation::isExcluded)
.map(PoolInformation::getName)
.collect(Collectors.toSet());
} finally {
read.unlock();
}
}


public String getGroup(Integer group) {
read.lock();
try {
Expand Down
Expand Up @@ -160,6 +160,10 @@ synchronized boolean isCountable() {
return canRead() || excluded;
}

synchronized boolean isExcluded() {
return excluded;
}

synchronized boolean isInitialized() {
return mode != null && tags != null && costInfo != null;
}
Expand Down
Expand Up @@ -80,13 +80,13 @@ public final class DefaultInaccessibleFileHandler extends InaccessibleFileHandle
+ "'inaccessible {}' to produce a list of orphaned pnfsids.";

private static final String MISSING_LOCATIONS_MESSAGE
= "{} has no locations in the namespace. "
= "{} has no locations in the namespace (file is lost). "
+ "Administrator intervention is required.";

@Override
protected Type handleNoLocationsForFile(FileOperation operation) {
PnfsId pnfsId = operation.getPnfsId();
LOGGER.error(AlarmMarkerFactory.getMarker(PredefinedAlarm.INACCESSIBLE_FILE,
LOGGER.error(AlarmMarkerFactory.getMarker(PredefinedAlarm.LOST_RESILIENT_FILE,
pnfsId.toString()),
MISSING_LOCATIONS_MESSAGE, pnfsId);
String error = String.format("%s has no locations.", pnfsId);
Expand Down
Expand Up @@ -60,9 +60,16 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
package org.dcache.resilience.handlers;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Collections;
import java.util.NoSuchElementException;
Expand All @@ -76,6 +83,10 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING

import diskCacheV111.util.CacheException;
import diskCacheV111.util.PnfsId;
import diskCacheV111.util.RetentionPolicy;
import diskCacheV111.util.SpreadAndWait;
import diskCacheV111.vehicles.HttpProtocolInfo;
import diskCacheV111.vehicles.PoolCheckFileMessage;
import diskCacheV111.vehicles.PoolManagerPoolInformation;

import dmg.cells.nucleus.CellPath;
Expand Down Expand Up @@ -125,10 +136,32 @@ public class FileOperationHandler {
private static final Logger ACTIVITY_LOGGER =
LoggerFactory.getLogger("org.dcache.resilience-log");

private static final String INCONSISTENT_LOCATIONS_ALARM
= "Resilience has detected an inconsistency or lag between "
+ "the namespace replica locations and the actual locations on disk.";

private static final ImmutableList<StickyRecord> ONLINE_STICKY_RECORD
= ImmutableList.of(
new StickyRecord("system", StickyRecord.NON_EXPIRING));

private static final RateLimiter LIMITER = RateLimiter.create(0.001);

private static void sendOutOfSyncAlarm() {
/*
* Create a new alarm every hour by keying the alarm to
* an hourly timestamp. Otherwise, the alarm counter will
* just be updated for each alarm sent. The rate limiter
* will not alarm more than once every 1000 seconds (once every
* 15 minutes).
*/
if (LIMITER.tryAcquire()) {
LOGGER.warn(AlarmMarkerFactory.getMarker(
PredefinedAlarm.RESILIENCE_LOC_SYNC_ISSUE,
Instant.now().truncatedTo(ChronoUnit.HOURS).toString()),
INCONSISTENT_LOCATIONS_ALARM);
}
}

/**
* <p>For communication with the {@link ResilientFileTask}.</p>
*/
Expand Down Expand Up @@ -188,8 +221,10 @@ public void handleBrokenFileLocation(PnfsId pnfsId, String pool) {
return;
}

int actual = attributes.getLocations().size();
int countable = poolInfoMap.getCountableLocations(attributes.getLocations());
Collection<String> locations = attributes.getLocations();

int actual = locations.size();
int countable = poolInfoMap.getCountableLocations(locations);

if (actual <= 1) {
/*
Expand Down Expand Up @@ -489,17 +524,47 @@ public Type handleVerification(FileAttributes attributes) {
* available source file. So we need the strictly readable
* locations, not just "countable" ones.
*/
Set<String> readableLocations
Set<String> namespaceReadable
= poolInfoMap.getReadableLocations(locations);

LOGGER.trace("handleVerification, {}, readable locations {}", pnfsId,
readableLocations);
Set<String> verifiedReadable;

try {
verifiedReadable = verifyLocations(pnfsId,
namespaceReadable,
pools);
} catch (InterruptedException e) {
LOGGER.trace("handleVerification, {}, verify pool "
+ "locations interrupted; "
+ "cancelling operation", pnfsId);
completionHandler.taskCancelled(pnfsId);
return Type.VOID;
}

LOGGER.trace("handleVerification, {}, namespace readable locations {},"
+ "verified readable locations {}", pnfsId,
namespaceReadable, verifiedReadable);

if (namespaceReadable.size() != verifiedReadable.size()) {
ACTIVITY_LOGGER.info("The namespace is not in sync with the pool "
+ "repositories for {}: "
+ "namespace locations "
+ "that are readable: {}; "
+ "actually found: {}.",
pnfsId, namespaceReadable, verifiedReadable);
sendOutOfSyncAlarm();
}

if (inaccessibleFileHandler.isInaccessible(readableLocations, operation)) {
if (inaccessibleFileHandler.isInaccessible(verifiedReadable, operation)) {
LOGGER.trace("handleVerification {}, no readable locations found, "
+ "checking to see if "
+ "file can be staged.", pnfsId);
return inaccessibleFileHandler.handleInaccessibleFile(operation);
}

if (shouldEvictALocation(operation, readableLocations)) {
if (shouldEvictALocation(operation, verifiedReadable)) {
LOGGER.trace("handleVerification, location should be evicted {}",
verifiedReadable);
return Type.REMOVE;
}

Expand All @@ -508,7 +573,7 @@ public Type handleVerification(FileAttributes attributes) {

return determineTypeFromConstraints(operation,
locations,
readableLocations);
verifiedReadable);
}

public void setCompletionHandler(
Expand Down Expand Up @@ -580,13 +645,26 @@ private Type determineTypeFromConstraints(FileOperation operation,

StorageUnitConstraints constraints
= poolInfoMap.getStorageUnitConstraints(sindex);

Set<String> excluded = poolInfoMap.getExcludedLocationNames(locations);
readableLocations = Sets.difference(readableLocations, excluded);

int required = constraints.getRequired();
int missing = required - readableLocations.size();

/*
* Countable means readable OR intentionally excluded locations.
* If there are copies missing only from excluded locations,
* do nothing.
* First compute the missing files on the basis of just the readable
* files. If this is positive, recompute by adding in all the
* excluded locations. If these satisfy the requirement, void
* the operation. Do no allow removes in this case, since this
* would imply decreasing already deficient locations.
*/
int missing = constraints.getRequired()
- poolInfoMap.getCountableLocations(locations);
if (missing > 0) {
missing -= excluded.size();
if (missing < 0) {
missing = 0;
}
}

Collection<String> tags = constraints.getOneCopyPer();

Expand Down Expand Up @@ -714,4 +792,53 @@ private boolean shouldEvictALocation(FileOperation operation,

return false;
}

/**
* <p>Called when there are no accessible replicas for the file.</p>
*
* <p>If the file's RetentionPolicy is CUSTODIAL, set the count to 1,
* to make sure the task completes after this. Staging is fire-and-forget,
* and depends on a new add cache location message being processed
* after staging.</p>
*
* @return true if file is CUSTODIAL, false otherwise.
*/
private boolean shouldTryToStage(FileAttributes attributes,
FileOperation operation) {
if (attributes.getRetentionPolicy() == RetentionPolicy.CUSTODIAL) {
LOGGER.trace("shouldTryToStage {}, retention policy is CUSTODIAL.",
operation.getPnfsId());
operation.setOpCount(1);
return true;
}
LOGGER.trace("shouldTryToStage {}, retention policy is not CUSTODIAL",
operation.getPnfsId());
return false;
}

/*
* REVISIT -- this will be expanded into a more general verification procedure
*
* Check the readable pools for the actual existence of the replica.
*/
private Set<String> verifyLocations(PnfsId pnfsId,
Collection<String> locations,
CellStub stub)
throws InterruptedException {
SpreadAndWait<PoolCheckFileMessage> controller = new SpreadAndWait<>(pools);

for(String pool: locations) {
LOGGER.trace("Sending query to {} to verify replica exists.", pool);
PoolCheckFileMessage request = new PoolCheckFileMessage(pool, pnfsId);
controller.send(new CellPath(pool), PoolCheckFileMessage.class, request);
}

controller.waitForReplies();

return controller.getReplies().values()
.stream()
.filter(PoolCheckFileMessage::getHave)
.map(PoolCheckFileMessage::getPoolName)
.collect(Collectors.toSet());
}
}
Expand Up @@ -310,7 +310,7 @@ private void checkLastRefresh() {
String initError = String.format(SYNC_ALARM, new Date(lastRefresh),
refreshTimeout, refreshTimeoutUnit);
LOGGER.error(AlarmMarkerFactory.getMarker(
PredefinedAlarm.RESILIENCE_SYNC_FAILURE,
PredefinedAlarm.RESILIENCE_PM_SYNC_FAILURE,
"resilience", String.valueOf(lastRefresh)),
initError);
}
Expand Down
Expand Up @@ -64,7 +64,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;

import diskCacheV111.util.CacheException;
Expand Down Expand Up @@ -139,6 +138,20 @@ public <T extends Message> ListenableFuture<T> send(final T message, CellEndpoin
return future;
}

public <T> ListenableFuture<T> send(CellPath destination,
Serializable message,
Class<T> type,
long timeout,
CellEndpoint.SendFlag... flags) {
ListenableFutureTask<T> future;
future = ListenableFutureTask.create(() -> {
processor.processMessage((Message)message);
return type.cast(message);
});
future.run();
return future;
}

public void setProcessor(TestMessageProcessor processor) {
this.processor = processor;
}
Expand Down
Expand Up @@ -68,22 +68,28 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING

import diskCacheV111.util.CacheException;
import diskCacheV111.util.PnfsId;
import diskCacheV111.vehicles.PoolCheckFileMessage;

import org.dcache.resilience.TestBase;
import org.dcache.resilience.TestMessageProcessor;
import org.dcache.resilience.TestSynchronousExecutor.Mode;
import org.dcache.resilience.handlers.PoolTaskCompletionHandler;
import org.dcache.vehicles.FileAttributes;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.*;

public final class FileOperationMapTest extends TestBase {
public final class FileOperationMapTest extends TestBase
implements TestMessageProcessor<PoolCheckFileMessage> {
PnfsId pnfsId;
FileAttributes attributes;
FileOperation operation;
File checkpoint = new File("checkpoint");

@Override
public void processMessage(PoolCheckFileMessage message) {
message.setHave(true);
}

@Before
public void setUp() throws CacheException, InterruptedException {
setUpBase();
Expand All @@ -92,6 +98,7 @@ public void setUp() throws CacheException, InterruptedException {
createFileOperationHandler();
createInaccessibleFileHandler();
createFileOperationMap();
setPoolMessageProcessor(this);
poolTaskCompletionHandler = new PoolTaskCompletionHandler();
poolTaskCompletionHandler.setMap(poolOperationMap);
wireFileOperationMap();
Expand Down

0 comments on commit bc1e5a2

Please sign in to comment.