Skip to content

Commit

Permalink
Merge pull request #4748 from alrossi/fix/4.0/resilience-check-replic…
Browse files Browse the repository at this point in the history
…as-simple

resilience: do simple existence check of replica on pool to avoid dar…
  • Loading branch information
mksahakyan committed Mar 28, 2019
2 parents 7ec040a + bc1e5a2 commit 1e6d6ec
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 28 deletions.
Expand Up @@ -80,8 +80,10 @@ public enum PredefinedAlarm implements Alarm {
BROKEN_FILE,
CHECKSUM,
INACCESSIBLE_FILE,
LOST_RESILIENT_FILE,
FAILED_REPLICATION,
RESILIENCE_SYNC_FAILURE;
RESILIENCE_PM_SYNC_FAILURE,
RESILIENCE_LOC_SYNC_ISSUE;

@Override
public String getType() {
Expand Down
Expand Up @@ -375,6 +375,13 @@ public boolean validateForAction(Integer storageUnit,
* Countable means readable OR intentionally excluded locations.
* If there are copies missing only from excluded locations,
* do nothing.
*
* NOTE. An initial check for consistency with the pools is
* avoided here so as not to block this thread on a
* send and wait. The locations will be reverified as
* part of the operation logic. While this means
* operations could unnecessarily be created, it
* ensures more efficient use of the thread resources.
*/
int countable = poolInfoMap.getCountableLocations(locations);
count = required - countable;
Expand Down
Expand Up @@ -75,6 +75,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
Expand Down Expand Up @@ -350,6 +351,22 @@ public int getCountableLocations(Collection<String> locations) {
return countable;
}

public Set<String> getExcludedLocationNames(Collection<String> members) {
read.lock();
try {
return members.stream()
.map(l -> poolInfo.get(getPoolIndex(l)))
.filter(Objects::nonNull)
.filter(PoolInformation::isInitialized)
.filter(PoolInformation::isExcluded)
.map(PoolInformation::getName)
.collect(Collectors.toSet());
} finally {
read.unlock();
}
}


public String getGroup(Integer group) {
read.lock();
try {
Expand Down
Expand Up @@ -160,6 +160,10 @@ synchronized boolean isCountable() {
return canRead() || excluded;
}

synchronized boolean isExcluded() {
return excluded;
}

synchronized boolean isInitialized() {
return mode != null && tags != null && costInfo != null;
}
Expand Down
Expand Up @@ -80,13 +80,13 @@ public final class DefaultInaccessibleFileHandler extends InaccessibleFileHandle
+ "'inaccessible {}' to produce a list of orphaned pnfsids.";

private static final String MISSING_LOCATIONS_MESSAGE
= "{} has no locations in the namespace. "
= "{} has no locations in the namespace (file is lost). "
+ "Administrator intervention is required.";

@Override
protected Type handleNoLocationsForFile(FileOperation operation) {
PnfsId pnfsId = operation.getPnfsId();
LOGGER.error(AlarmMarkerFactory.getMarker(PredefinedAlarm.INACCESSIBLE_FILE,
LOGGER.error(AlarmMarkerFactory.getMarker(PredefinedAlarm.LOST_RESILIENT_FILE,
pnfsId.toString()),
MISSING_LOCATIONS_MESSAGE, pnfsId);
String error = String.format("%s has no locations.", pnfsId);
Expand Down
Expand Up @@ -60,9 +60,16 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
package org.dcache.resilience.handlers;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Collections;
import java.util.NoSuchElementException;
Expand All @@ -76,6 +83,10 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING

import diskCacheV111.util.CacheException;
import diskCacheV111.util.PnfsId;
import diskCacheV111.util.RetentionPolicy;
import diskCacheV111.util.SpreadAndWait;
import diskCacheV111.vehicles.HttpProtocolInfo;
import diskCacheV111.vehicles.PoolCheckFileMessage;
import diskCacheV111.vehicles.PoolManagerPoolInformation;

import dmg.cells.nucleus.CellPath;
Expand Down Expand Up @@ -125,10 +136,32 @@ public class FileOperationHandler {
private static final Logger ACTIVITY_LOGGER =
LoggerFactory.getLogger("org.dcache.resilience-log");

private static final String INCONSISTENT_LOCATIONS_ALARM
= "Resilience has detected an inconsistency or lag between "
+ "the namespace replica locations and the actual locations on disk.";

private static final ImmutableList<StickyRecord> ONLINE_STICKY_RECORD
= ImmutableList.of(
new StickyRecord("system", StickyRecord.NON_EXPIRING));

private static final RateLimiter LIMITER = RateLimiter.create(0.001);

private static void sendOutOfSyncAlarm() {
/*
* Create a new alarm every hour by keying the alarm to
* an hourly timestamp. Otherwise, the alarm counter will
* just be updated for each alarm sent. The rate limiter
* will not alarm more than once every 1000 seconds (once every
* 15 minutes).
*/
if (LIMITER.tryAcquire()) {
LOGGER.warn(AlarmMarkerFactory.getMarker(
PredefinedAlarm.RESILIENCE_LOC_SYNC_ISSUE,
Instant.now().truncatedTo(ChronoUnit.HOURS).toString()),
INCONSISTENT_LOCATIONS_ALARM);
}
}

/**
* <p>For communication with the {@link ResilientFileTask}.</p>
*/
Expand Down Expand Up @@ -188,8 +221,10 @@ public void handleBrokenFileLocation(PnfsId pnfsId, String pool) {
return;
}

int actual = attributes.getLocations().size();
int countable = poolInfoMap.getCountableLocations(attributes.getLocations());
Collection<String> locations = attributes.getLocations();

int actual = locations.size();
int countable = poolInfoMap.getCountableLocations(locations);

if (actual <= 1) {
/*
Expand Down Expand Up @@ -489,17 +524,47 @@ public Type handleVerification(FileAttributes attributes) {
* available source file. So we need the strictly readable
* locations, not just "countable" ones.
*/
Set<String> readableLocations
Set<String> namespaceReadable
= poolInfoMap.getReadableLocations(locations);

LOGGER.trace("handleVerification, {}, readable locations {}", pnfsId,
readableLocations);
Set<String> verifiedReadable;

try {
verifiedReadable = verifyLocations(pnfsId,
namespaceReadable,
pools);
} catch (InterruptedException e) {
LOGGER.trace("handleVerification, {}, verify pool "
+ "locations interrupted; "
+ "cancelling operation", pnfsId);
completionHandler.taskCancelled(pnfsId);
return Type.VOID;
}

LOGGER.trace("handleVerification, {}, namespace readable locations {},"
+ "verified readable locations {}", pnfsId,
namespaceReadable, verifiedReadable);

if (namespaceReadable.size() != verifiedReadable.size()) {
ACTIVITY_LOGGER.info("The namespace is not in sync with the pool "
+ "repositories for {}: "
+ "namespace locations "
+ "that are readable: {}; "
+ "actually found: {}.",
pnfsId, namespaceReadable, verifiedReadable);
sendOutOfSyncAlarm();
}

if (inaccessibleFileHandler.isInaccessible(readableLocations, operation)) {
if (inaccessibleFileHandler.isInaccessible(verifiedReadable, operation)) {
LOGGER.trace("handleVerification {}, no readable locations found, "
+ "checking to see if "
+ "file can be staged.", pnfsId);
return inaccessibleFileHandler.handleInaccessibleFile(operation);
}

if (shouldEvictALocation(operation, readableLocations)) {
if (shouldEvictALocation(operation, verifiedReadable)) {
LOGGER.trace("handleVerification, location should be evicted {}",
verifiedReadable);
return Type.REMOVE;
}

Expand All @@ -508,7 +573,7 @@ public Type handleVerification(FileAttributes attributes) {

return determineTypeFromConstraints(operation,
locations,
readableLocations);
verifiedReadable);
}

public void setCompletionHandler(
Expand Down Expand Up @@ -580,13 +645,26 @@ private Type determineTypeFromConstraints(FileOperation operation,

StorageUnitConstraints constraints
= poolInfoMap.getStorageUnitConstraints(sindex);

Set<String> excluded = poolInfoMap.getExcludedLocationNames(locations);
readableLocations = Sets.difference(readableLocations, excluded);

int required = constraints.getRequired();
int missing = required - readableLocations.size();

/*
* Countable means readable OR intentionally excluded locations.
* If there are copies missing only from excluded locations,
* do nothing.
* First compute the missing files on the basis of just the readable
* files. If this is positive, recompute by adding in all the
* excluded locations. If these satisfy the requirement, void
* the operation. Do no allow removes in this case, since this
* would imply decreasing already deficient locations.
*/
int missing = constraints.getRequired()
- poolInfoMap.getCountableLocations(locations);
if (missing > 0) {
missing -= excluded.size();
if (missing < 0) {
missing = 0;
}
}

Collection<String> tags = constraints.getOneCopyPer();

Expand Down Expand Up @@ -714,4 +792,53 @@ private boolean shouldEvictALocation(FileOperation operation,

return false;
}

/**
* <p>Called when there are no accessible replicas for the file.</p>
*
* <p>If the file's RetentionPolicy is CUSTODIAL, set the count to 1,
* to make sure the task completes after this. Staging is fire-and-forget,
* and depends on a new add cache location message being processed
* after staging.</p>
*
* @return true if file is CUSTODIAL, false otherwise.
*/
private boolean shouldTryToStage(FileAttributes attributes,
FileOperation operation) {
if (attributes.getRetentionPolicy() == RetentionPolicy.CUSTODIAL) {
LOGGER.trace("shouldTryToStage {}, retention policy is CUSTODIAL.",
operation.getPnfsId());
operation.setOpCount(1);
return true;
}
LOGGER.trace("shouldTryToStage {}, retention policy is not CUSTODIAL",
operation.getPnfsId());
return false;
}

/*
* REVISIT -- this will be expanded into a more general verification procedure
*
* Check the readable pools for the actual existence of the replica.
*/
private Set<String> verifyLocations(PnfsId pnfsId,
Collection<String> locations,
CellStub stub)
throws InterruptedException {
SpreadAndWait<PoolCheckFileMessage> controller = new SpreadAndWait<>(pools);

for(String pool: locations) {
LOGGER.trace("Sending query to {} to verify replica exists.", pool);
PoolCheckFileMessage request = new PoolCheckFileMessage(pool, pnfsId);
controller.send(new CellPath(pool), PoolCheckFileMessage.class, request);
}

controller.waitForReplies();

return controller.getReplies().values()
.stream()
.filter(PoolCheckFileMessage::getHave)
.map(PoolCheckFileMessage::getPoolName)
.collect(Collectors.toSet());
}
}
Expand Up @@ -316,7 +316,7 @@ private void checkLastRefresh() {
String initError = String.format(SYNC_ALARM, new Date(lastRefresh),
refreshTimeout, refreshTimeoutUnit);
LOGGER.error(AlarmMarkerFactory.getMarker(
PredefinedAlarm.RESILIENCE_SYNC_FAILURE,
PredefinedAlarm.RESILIENCE_PM_SYNC_FAILURE,
"resilience", String.valueOf(lastRefresh)),
initError);
}
Expand Down
Expand Up @@ -64,7 +64,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;

import diskCacheV111.util.CacheException;
Expand Down Expand Up @@ -139,6 +138,20 @@ public <T extends Message> ListenableFuture<T> send(final T message, CellEndpoin
return future;
}

public <T> ListenableFuture<T> send(CellPath destination,
Serializable message,
Class<T> type,
long timeout,
CellEndpoint.SendFlag... flags) {
ListenableFutureTask<T> future;
future = ListenableFutureTask.create(() -> {
processor.processMessage((Message)message);
return type.cast(message);
});
future.run();
return future;
}

public void setProcessor(TestMessageProcessor processor) {
this.processor = processor;
}
Expand Down
Expand Up @@ -68,22 +68,28 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING

import diskCacheV111.util.CacheException;
import diskCacheV111.util.PnfsId;
import diskCacheV111.vehicles.PoolCheckFileMessage;

import org.dcache.resilience.TestBase;
import org.dcache.resilience.TestMessageProcessor;
import org.dcache.resilience.TestSynchronousExecutor.Mode;
import org.dcache.resilience.handlers.PoolTaskCompletionHandler;
import org.dcache.vehicles.FileAttributes;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.*;

public final class FileOperationMapTest extends TestBase {
public final class FileOperationMapTest extends TestBase
implements TestMessageProcessor<PoolCheckFileMessage> {
PnfsId pnfsId;
FileAttributes attributes;
FileOperation operation;
File checkpoint = new File("checkpoint");

@Override
public void processMessage(PoolCheckFileMessage message) {
message.setHave(true);
}

@Before
public void setUp() throws CacheException, InterruptedException {
setUpBase();
Expand All @@ -92,6 +98,7 @@ public void setUp() throws CacheException, InterruptedException {
createFileOperationHandler();
createInaccessibleFileHandler();
createFileOperationMap();
setPoolMessageProcessor(this);
poolTaskCompletionHandler = new PoolTaskCompletionHandler();
poolTaskCompletionHandler.setMap(poolOperationMap);
wireFileOperationMap();
Expand Down

0 comments on commit 1e6d6ec

Please sign in to comment.