From bc1e5a29d9def45563c4682d3515c877de61c359 Mon Sep 17 00:00:00 2001 From: Albert Louis Rossi Date: Wed, 27 Mar 2019 12:44:10 -0500 Subject: [PATCH] resilience: do simple existence check of replica on pool to avoid dark removes Motivation: A recent experience with data loss from removes during (more than likely) multiple concurrent "pool up" scans suggested that relying on the namespace for replica locations is inherently subject to a race between resilience and the pool clear cache location message. Testing has confirmed this. See issue https://github.com/dCache/dcache/issues/4742 Modification: Add a SpreadAndWait of the PoolCheckFileMessage to each of the readable pool locations and return the pools on which a replica for that pnfsid is found. This is done for both copy and remove. An alarm at the warning level is issued at most once every 15 minutes with new alarms keyed to hourly timestamp, when the lag or inconsistency is detected. Missing files (no replicas and no tape) continue to be logged to activity logger (.resilience file), just as with inaccessible files, whenever they are detected. A small correctness adjustment has been made to the method which determines the operation type (in order to handle "excluded" pools correctly). Junit testing has been modified to simulate the new messaging (always returning true for file existence, in order to maintain consistency with existing tests). Result: Dark removes which can result in the removal of all replicas for a given file are prevented. Target: master Request: 5.0 Request: 4.2 Request: 4.1 Request: 4.0 Requires-notes: yes Requires-book: no Acked-by: Tigran --- .../org/dcache/alarms/PredefinedAlarm.java | 4 +- .../dcache/resilience/data/FileUpdate.java | 7 + .../dcache/resilience/data/PoolInfoMap.java | 17 ++ .../resilience/data/PoolInformation.java | 4 + .../DefaultInaccessibleFileHandler.java | 4 +- .../handlers/FileOperationHandler.java | 153 ++++++++++++++++-- .../handlers/PoolInfoChangeHandler.java | 2 +- .../java/org/dcache/resilience/TestStub.java | 15 +- .../resilience/data/FileOperationMapTest.java | 17 +- .../handlers/FileOperationHandlerTest.java | 11 +- 10 files changed, 206 insertions(+), 28 deletions(-) diff --git a/modules/common/src/main/java/org/dcache/alarms/PredefinedAlarm.java b/modules/common/src/main/java/org/dcache/alarms/PredefinedAlarm.java index ad1d160b0e7..4f5e4df5bb1 100644 --- a/modules/common/src/main/java/org/dcache/alarms/PredefinedAlarm.java +++ b/modules/common/src/main/java/org/dcache/alarms/PredefinedAlarm.java @@ -80,8 +80,10 @@ public enum PredefinedAlarm implements Alarm { BROKEN_FILE, CHECKSUM, INACCESSIBLE_FILE, + LOST_RESILIENT_FILE, FAILED_REPLICATION, - RESILIENCE_SYNC_FAILURE; + RESILIENCE_PM_SYNC_FAILURE, + RESILIENCE_LOC_SYNC_ISSUE; @Override public String getType() { diff --git a/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/FileUpdate.java b/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/FileUpdate.java index e552aab33ca..370b411d310 100644 --- a/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/FileUpdate.java +++ b/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/FileUpdate.java @@ -375,6 +375,13 @@ public boolean validateForAction(Integer storageUnit, * Countable means readable OR intentionally excluded locations. * If there are copies missing only from excluded locations, * do nothing. + * + * NOTE. An initial check for consistency with the pools is + * avoided here so as not to block this thread on a + * send and wait. The locations will be reverified as + * part of the operation logic. While this means + * operations could unnecessarily be created, it + * ensures more efficient use of the thread resources. */ int countable = poolInfoMap.getCountableLocations(locations); count = required - countable; diff --git a/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/PoolInfoMap.java b/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/PoolInfoMap.java index 375da8f4876..86e6df4122e 100644 --- a/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/PoolInfoMap.java +++ b/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/PoolInfoMap.java @@ -75,6 +75,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -350,6 +351,22 @@ public int getCountableLocations(Collection locations) { return countable; } + public Set getExcludedLocationNames(Collection members) { + read.lock(); + try { + return members.stream() + .map(l -> poolInfo.get(getPoolIndex(l))) + .filter(Objects::nonNull) + .filter(PoolInformation::isInitialized) + .filter(PoolInformation::isExcluded) + .map(PoolInformation::getName) + .collect(Collectors.toSet()); + } finally { + read.unlock(); + } + } + + public String getGroup(Integer group) { read.lock(); try { diff --git a/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/PoolInformation.java b/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/PoolInformation.java index dbc7f1c688f..f793900489b 100644 --- a/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/PoolInformation.java +++ b/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/PoolInformation.java @@ -160,6 +160,10 @@ synchronized boolean isCountable() { return canRead() || excluded; } + synchronized boolean isExcluded() { + return excluded; + } + synchronized boolean isInitialized() { return mode != null && tags != null && costInfo != null; } diff --git a/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/DefaultInaccessibleFileHandler.java b/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/DefaultInaccessibleFileHandler.java index 3c5ed208e53..0eb047bcde5 100644 --- a/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/DefaultInaccessibleFileHandler.java +++ b/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/DefaultInaccessibleFileHandler.java @@ -80,13 +80,13 @@ public final class DefaultInaccessibleFileHandler extends InaccessibleFileHandle + "'inaccessible {}' to produce a list of orphaned pnfsids."; private static final String MISSING_LOCATIONS_MESSAGE - = "{} has no locations in the namespace. " + = "{} has no locations in the namespace (file is lost). " + "Administrator intervention is required."; @Override protected Type handleNoLocationsForFile(FileOperation operation) { PnfsId pnfsId = operation.getPnfsId(); - LOGGER.error(AlarmMarkerFactory.getMarker(PredefinedAlarm.INACCESSIBLE_FILE, + LOGGER.error(AlarmMarkerFactory.getMarker(PredefinedAlarm.LOST_RESILIENT_FILE, pnfsId.toString()), MISSING_LOCATIONS_MESSAGE, pnfsId); String error = String.format("%s has no locations.", pnfsId); diff --git a/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/FileOperationHandler.java b/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/FileOperationHandler.java index a2e51247bef..0fd636dbad9 100644 --- a/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/FileOperationHandler.java +++ b/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/FileOperationHandler.java @@ -60,9 +60,16 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING package org.dcache.resilience.handlers; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.RateLimiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Collection; import java.util.Collections; import java.util.NoSuchElementException; @@ -76,6 +83,10 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import diskCacheV111.util.CacheException; import diskCacheV111.util.PnfsId; +import diskCacheV111.util.RetentionPolicy; +import diskCacheV111.util.SpreadAndWait; +import diskCacheV111.vehicles.HttpProtocolInfo; +import diskCacheV111.vehicles.PoolCheckFileMessage; import diskCacheV111.vehicles.PoolManagerPoolInformation; import dmg.cells.nucleus.CellPath; @@ -125,10 +136,32 @@ public class FileOperationHandler { private static final Logger ACTIVITY_LOGGER = LoggerFactory.getLogger("org.dcache.resilience-log"); + private static final String INCONSISTENT_LOCATIONS_ALARM + = "Resilience has detected an inconsistency or lag between " + + "the namespace replica locations and the actual locations on disk."; + private static final ImmutableList ONLINE_STICKY_RECORD = ImmutableList.of( new StickyRecord("system", StickyRecord.NON_EXPIRING)); + private static final RateLimiter LIMITER = RateLimiter.create(0.001); + + private static void sendOutOfSyncAlarm() { + /* + * Create a new alarm every hour by keying the alarm to + * an hourly timestamp. Otherwise, the alarm counter will + * just be updated for each alarm sent. The rate limiter + * will not alarm more than once every 1000 seconds (once every + * 15 minutes). + */ + if (LIMITER.tryAcquire()) { + LOGGER.warn(AlarmMarkerFactory.getMarker( + PredefinedAlarm.RESILIENCE_LOC_SYNC_ISSUE, + Instant.now().truncatedTo(ChronoUnit.HOURS).toString()), + INCONSISTENT_LOCATIONS_ALARM); + } + } + /** *

For communication with the {@link ResilientFileTask}.

*/ @@ -188,8 +221,10 @@ public void handleBrokenFileLocation(PnfsId pnfsId, String pool) { return; } - int actual = attributes.getLocations().size(); - int countable = poolInfoMap.getCountableLocations(attributes.getLocations()); + Collection locations = attributes.getLocations(); + + int actual = locations.size(); + int countable = poolInfoMap.getCountableLocations(locations); if (actual <= 1) { /* @@ -489,17 +524,47 @@ public Type handleVerification(FileAttributes attributes) { * available source file. So we need the strictly readable * locations, not just "countable" ones. */ - Set readableLocations + Set namespaceReadable = poolInfoMap.getReadableLocations(locations); - LOGGER.trace("handleVerification, {}, readable locations {}", pnfsId, - readableLocations); + Set verifiedReadable; + + try { + verifiedReadable = verifyLocations(pnfsId, + namespaceReadable, + pools); + } catch (InterruptedException e) { + LOGGER.trace("handleVerification, {}, verify pool " + + "locations interrupted; " + + "cancelling operation", pnfsId); + completionHandler.taskCancelled(pnfsId); + return Type.VOID; + } + + LOGGER.trace("handleVerification, {}, namespace readable locations {}," + + "verified readable locations {}", pnfsId, + namespaceReadable, verifiedReadable); + + if (namespaceReadable.size() != verifiedReadable.size()) { + ACTIVITY_LOGGER.info("The namespace is not in sync with the pool " + + "repositories for {}: " + + "namespace locations " + + "that are readable: {}; " + + "actually found: {}.", + pnfsId, namespaceReadable, verifiedReadable); + sendOutOfSyncAlarm(); + } - if (inaccessibleFileHandler.isInaccessible(readableLocations, operation)) { + if (inaccessibleFileHandler.isInaccessible(verifiedReadable, operation)) { + LOGGER.trace("handleVerification {}, no readable locations found, " + + "checking to see if " + + "file can be staged.", pnfsId); return inaccessibleFileHandler.handleInaccessibleFile(operation); } - if (shouldEvictALocation(operation, readableLocations)) { + if (shouldEvictALocation(operation, verifiedReadable)) { + LOGGER.trace("handleVerification, location should be evicted {}", + verifiedReadable); return Type.REMOVE; } @@ -508,7 +573,7 @@ public Type handleVerification(FileAttributes attributes) { return determineTypeFromConstraints(operation, locations, - readableLocations); + verifiedReadable); } public void setCompletionHandler( @@ -580,13 +645,26 @@ private Type determineTypeFromConstraints(FileOperation operation, StorageUnitConstraints constraints = poolInfoMap.getStorageUnitConstraints(sindex); + + Set excluded = poolInfoMap.getExcludedLocationNames(locations); + readableLocations = Sets.difference(readableLocations, excluded); + + int required = constraints.getRequired(); + int missing = required - readableLocations.size(); + /* - * Countable means readable OR intentionally excluded locations. - * If there are copies missing only from excluded locations, - * do nothing. + * First compute the missing files on the basis of just the readable + * files. If this is positive, recompute by adding in all the + * excluded locations. If these satisfy the requirement, void + * the operation. Do no allow removes in this case, since this + * would imply decreasing already deficient locations. */ - int missing = constraints.getRequired() - - poolInfoMap.getCountableLocations(locations); + if (missing > 0) { + missing -= excluded.size(); + if (missing < 0) { + missing = 0; + } + } Collection tags = constraints.getOneCopyPer(); @@ -714,4 +792,53 @@ private boolean shouldEvictALocation(FileOperation operation, return false; } + + /** + *

Called when there are no accessible replicas for the file.

+ * + *

If the file's RetentionPolicy is CUSTODIAL, set the count to 1, + * to make sure the task completes after this. Staging is fire-and-forget, + * and depends on a new add cache location message being processed + * after staging.

+ * + * @return true if file is CUSTODIAL, false otherwise. + */ + private boolean shouldTryToStage(FileAttributes attributes, + FileOperation operation) { + if (attributes.getRetentionPolicy() == RetentionPolicy.CUSTODIAL) { + LOGGER.trace("shouldTryToStage {}, retention policy is CUSTODIAL.", + operation.getPnfsId()); + operation.setOpCount(1); + return true; + } + LOGGER.trace("shouldTryToStage {}, retention policy is not CUSTODIAL", + operation.getPnfsId()); + return false; + } + + /* + * REVISIT -- this will be expanded into a more general verification procedure + * + * Check the readable pools for the actual existence of the replica. + */ + private Set verifyLocations(PnfsId pnfsId, + Collection locations, + CellStub stub) + throws InterruptedException { + SpreadAndWait controller = new SpreadAndWait<>(pools); + + for(String pool: locations) { + LOGGER.trace("Sending query to {} to verify replica exists.", pool); + PoolCheckFileMessage request = new PoolCheckFileMessage(pool, pnfsId); + controller.send(new CellPath(pool), PoolCheckFileMessage.class, request); + } + + controller.waitForReplies(); + + return controller.getReplies().values() + .stream() + .filter(PoolCheckFileMessage::getHave) + .map(PoolCheckFileMessage::getPoolName) + .collect(Collectors.toSet()); + } } diff --git a/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/PoolInfoChangeHandler.java b/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/PoolInfoChangeHandler.java index 18770d904b9..3cf57332753 100644 --- a/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/PoolInfoChangeHandler.java +++ b/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/PoolInfoChangeHandler.java @@ -310,7 +310,7 @@ private void checkLastRefresh() { String initError = String.format(SYNC_ALARM, new Date(lastRefresh), refreshTimeout, refreshTimeoutUnit); LOGGER.error(AlarmMarkerFactory.getMarker( - PredefinedAlarm.RESILIENCE_SYNC_FAILURE, + PredefinedAlarm.RESILIENCE_PM_SYNC_FAILURE, "resilience", String.valueOf(lastRefresh)), initError); } diff --git a/modules/dcache-resilience/src/test/java/org/dcache/resilience/TestStub.java b/modules/dcache-resilience/src/test/java/org/dcache/resilience/TestStub.java index b2828842d98..a5a905e1fb6 100644 --- a/modules/dcache-resilience/src/test/java/org/dcache/resilience/TestStub.java +++ b/modules/dcache-resilience/src/test/java/org/dcache/resilience/TestStub.java @@ -64,7 +64,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.io.Serializable; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.Executor; import diskCacheV111.util.CacheException; @@ -139,6 +138,20 @@ public ListenableFuture send(final T message, CellEndpoin return future; } + public ListenableFuture send(CellPath destination, + Serializable message, + Class type, + long timeout, + CellEndpoint.SendFlag... flags) { + ListenableFutureTask future; + future = ListenableFutureTask.create(() -> { + processor.processMessage((Message)message); + return type.cast(message); + }); + future.run(); + return future; + } + public void setProcessor(TestMessageProcessor processor) { this.processor = processor; } diff --git a/modules/dcache-resilience/src/test/java/org/dcache/resilience/data/FileOperationMapTest.java b/modules/dcache-resilience/src/test/java/org/dcache/resilience/data/FileOperationMapTest.java index 99613a347a2..b16bc91c70f 100644 --- a/modules/dcache-resilience/src/test/java/org/dcache/resilience/data/FileOperationMapTest.java +++ b/modules/dcache-resilience/src/test/java/org/dcache/resilience/data/FileOperationMapTest.java @@ -68,22 +68,28 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import diskCacheV111.util.CacheException; import diskCacheV111.util.PnfsId; +import diskCacheV111.vehicles.PoolCheckFileMessage; + import org.dcache.resilience.TestBase; +import org.dcache.resilience.TestMessageProcessor; import org.dcache.resilience.TestSynchronousExecutor.Mode; import org.dcache.resilience.handlers.PoolTaskCompletionHandler; import org.dcache.vehicles.FileAttributes; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; +import static org.junit.Assert.*; -public final class FileOperationMapTest extends TestBase { +public final class FileOperationMapTest extends TestBase + implements TestMessageProcessor { PnfsId pnfsId; FileAttributes attributes; FileOperation operation; File checkpoint = new File("checkpoint"); + @Override + public void processMessage(PoolCheckFileMessage message) { + message.setHave(true); + } + @Before public void setUp() throws CacheException, InterruptedException { setUpBase(); @@ -92,6 +98,7 @@ public void setUp() throws CacheException, InterruptedException { createFileOperationHandler(); createInaccessibleFileHandler(); createFileOperationMap(); + setPoolMessageProcessor(this); poolTaskCompletionHandler = new PoolTaskCompletionHandler(); poolTaskCompletionHandler.setMap(poolOperationMap); wireFileOperationMap(); diff --git a/modules/dcache-resilience/src/test/java/org/dcache/resilience/handlers/FileOperationHandlerTest.java b/modules/dcache-resilience/src/test/java/org/dcache/resilience/handlers/FileOperationHandlerTest.java index f8ca67504d1..4367b733eef 100644 --- a/modules/dcache-resilience/src/test/java/org/dcache/resilience/handlers/FileOperationHandlerTest.java +++ b/modules/dcache-resilience/src/test/java/org/dcache/resilience/handlers/FileOperationHandlerTest.java @@ -70,6 +70,8 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import diskCacheV111.util.CacheException; import diskCacheV111.util.PnfsId; import diskCacheV111.vehicles.Message; +import diskCacheV111.vehicles.PoolCheckFileMessage; + import org.dcache.pool.migration.Task; import org.dcache.resilience.TestBase; import org.dcache.resilience.TestMessageProcessor; @@ -85,11 +87,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import org.dcache.vehicles.FileAttributes; import org.dcache.vehicles.resilience.RemoveReplicaMessage; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public final class FileOperationHandlerTest extends TestBase implements TestMessageProcessor { @@ -111,6 +109,9 @@ public void processMessage(Message message) throws Exception { } repRmMessage = (RemoveReplicaMessage) message; + } else if (message instanceof PoolCheckFileMessage) { + PoolCheckFileMessage reply = (PoolCheckFileMessage) message; + reply.setHave(true); } }