diff --git a/modules/common/src/main/java/org/dcache/alarms/PredefinedAlarm.java b/modules/common/src/main/java/org/dcache/alarms/PredefinedAlarm.java index ad1d160b0e7..4f5e4df5bb1 100644 --- a/modules/common/src/main/java/org/dcache/alarms/PredefinedAlarm.java +++ b/modules/common/src/main/java/org/dcache/alarms/PredefinedAlarm.java @@ -80,8 +80,10 @@ public enum PredefinedAlarm implements Alarm { BROKEN_FILE, CHECKSUM, INACCESSIBLE_FILE, + LOST_RESILIENT_FILE, FAILED_REPLICATION, - RESILIENCE_SYNC_FAILURE; + RESILIENCE_PM_SYNC_FAILURE, + RESILIENCE_LOC_SYNC_ISSUE; @Override public String getType() { diff --git a/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/FileUpdate.java b/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/FileUpdate.java index e552aab33ca..370b411d310 100644 --- a/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/FileUpdate.java +++ b/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/FileUpdate.java @@ -375,6 +375,13 @@ public boolean validateForAction(Integer storageUnit, * Countable means readable OR intentionally excluded locations. * If there are copies missing only from excluded locations, * do nothing. + * + * NOTE. An initial check for consistency with the pools is + * avoided here so as not to block this thread on a + * send and wait. The locations will be reverified as + * part of the operation logic. While this means + * operations could unnecessarily be created, it + * ensures more efficient use of the thread resources. */ int countable = poolInfoMap.getCountableLocations(locations); count = required - countable; diff --git a/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/PoolInfoMap.java b/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/PoolInfoMap.java index 375da8f4876..86e6df4122e 100644 --- a/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/PoolInfoMap.java +++ b/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/PoolInfoMap.java @@ -75,6 +75,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -350,6 +351,22 @@ public int getCountableLocations(Collection locations) { return countable; } + public Set getExcludedLocationNames(Collection members) { + read.lock(); + try { + return members.stream() + .map(l -> poolInfo.get(getPoolIndex(l))) + .filter(Objects::nonNull) + .filter(PoolInformation::isInitialized) + .filter(PoolInformation::isExcluded) + .map(PoolInformation::getName) + .collect(Collectors.toSet()); + } finally { + read.unlock(); + } + } + + public String getGroup(Integer group) { read.lock(); try { diff --git a/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/PoolInformation.java b/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/PoolInformation.java index dbc7f1c688f..f793900489b 100644 --- a/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/PoolInformation.java +++ b/modules/dcache-resilience/src/main/java/org/dcache/resilience/data/PoolInformation.java @@ -160,6 +160,10 @@ synchronized boolean isCountable() { return canRead() || excluded; } + synchronized boolean isExcluded() { + return excluded; + } + synchronized boolean isInitialized() { return mode != null && tags != null && costInfo != null; } diff --git a/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/DefaultInaccessibleFileHandler.java b/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/DefaultInaccessibleFileHandler.java index 3c5ed208e53..0eb047bcde5 100644 --- a/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/DefaultInaccessibleFileHandler.java +++ b/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/DefaultInaccessibleFileHandler.java @@ -80,13 +80,13 @@ public final class DefaultInaccessibleFileHandler extends InaccessibleFileHandle + "'inaccessible {}' to produce a list of orphaned pnfsids."; private static final String MISSING_LOCATIONS_MESSAGE - = "{} has no locations in the namespace. " + = "{} has no locations in the namespace (file is lost). " + "Administrator intervention is required."; @Override protected Type handleNoLocationsForFile(FileOperation operation) { PnfsId pnfsId = operation.getPnfsId(); - LOGGER.error(AlarmMarkerFactory.getMarker(PredefinedAlarm.INACCESSIBLE_FILE, + LOGGER.error(AlarmMarkerFactory.getMarker(PredefinedAlarm.LOST_RESILIENT_FILE, pnfsId.toString()), MISSING_LOCATIONS_MESSAGE, pnfsId); String error = String.format("%s has no locations.", pnfsId); diff --git a/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/FileOperationHandler.java b/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/FileOperationHandler.java index a2e51247bef..0fd636dbad9 100644 --- a/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/FileOperationHandler.java +++ b/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/FileOperationHandler.java @@ -60,9 +60,16 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING package org.dcache.resilience.handlers; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.RateLimiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Collection; import java.util.Collections; import java.util.NoSuchElementException; @@ -76,6 +83,10 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import diskCacheV111.util.CacheException; import diskCacheV111.util.PnfsId; +import diskCacheV111.util.RetentionPolicy; +import diskCacheV111.util.SpreadAndWait; +import diskCacheV111.vehicles.HttpProtocolInfo; +import diskCacheV111.vehicles.PoolCheckFileMessage; import diskCacheV111.vehicles.PoolManagerPoolInformation; import dmg.cells.nucleus.CellPath; @@ -125,10 +136,32 @@ public class FileOperationHandler { private static final Logger ACTIVITY_LOGGER = LoggerFactory.getLogger("org.dcache.resilience-log"); + private static final String INCONSISTENT_LOCATIONS_ALARM + = "Resilience has detected an inconsistency or lag between " + + "the namespace replica locations and the actual locations on disk."; + private static final ImmutableList ONLINE_STICKY_RECORD = ImmutableList.of( new StickyRecord("system", StickyRecord.NON_EXPIRING)); + private static final RateLimiter LIMITER = RateLimiter.create(0.001); + + private static void sendOutOfSyncAlarm() { + /* + * Create a new alarm every hour by keying the alarm to + * an hourly timestamp. Otherwise, the alarm counter will + * just be updated for each alarm sent. The rate limiter + * will not alarm more than once every 1000 seconds (once every + * 15 minutes). + */ + if (LIMITER.tryAcquire()) { + LOGGER.warn(AlarmMarkerFactory.getMarker( + PredefinedAlarm.RESILIENCE_LOC_SYNC_ISSUE, + Instant.now().truncatedTo(ChronoUnit.HOURS).toString()), + INCONSISTENT_LOCATIONS_ALARM); + } + } + /** *

For communication with the {@link ResilientFileTask}.

*/ @@ -188,8 +221,10 @@ public void handleBrokenFileLocation(PnfsId pnfsId, String pool) { return; } - int actual = attributes.getLocations().size(); - int countable = poolInfoMap.getCountableLocations(attributes.getLocations()); + Collection locations = attributes.getLocations(); + + int actual = locations.size(); + int countable = poolInfoMap.getCountableLocations(locations); if (actual <= 1) { /* @@ -489,17 +524,47 @@ public Type handleVerification(FileAttributes attributes) { * available source file. So we need the strictly readable * locations, not just "countable" ones. */ - Set readableLocations + Set namespaceReadable = poolInfoMap.getReadableLocations(locations); - LOGGER.trace("handleVerification, {}, readable locations {}", pnfsId, - readableLocations); + Set verifiedReadable; + + try { + verifiedReadable = verifyLocations(pnfsId, + namespaceReadable, + pools); + } catch (InterruptedException e) { + LOGGER.trace("handleVerification, {}, verify pool " + + "locations interrupted; " + + "cancelling operation", pnfsId); + completionHandler.taskCancelled(pnfsId); + return Type.VOID; + } + + LOGGER.trace("handleVerification, {}, namespace readable locations {}," + + "verified readable locations {}", pnfsId, + namespaceReadable, verifiedReadable); + + if (namespaceReadable.size() != verifiedReadable.size()) { + ACTIVITY_LOGGER.info("The namespace is not in sync with the pool " + + "repositories for {}: " + + "namespace locations " + + "that are readable: {}; " + + "actually found: {}.", + pnfsId, namespaceReadable, verifiedReadable); + sendOutOfSyncAlarm(); + } - if (inaccessibleFileHandler.isInaccessible(readableLocations, operation)) { + if (inaccessibleFileHandler.isInaccessible(verifiedReadable, operation)) { + LOGGER.trace("handleVerification {}, no readable locations found, " + + "checking to see if " + + "file can be staged.", pnfsId); return inaccessibleFileHandler.handleInaccessibleFile(operation); } - if (shouldEvictALocation(operation, readableLocations)) { + if (shouldEvictALocation(operation, verifiedReadable)) { + LOGGER.trace("handleVerification, location should be evicted {}", + verifiedReadable); return Type.REMOVE; } @@ -508,7 +573,7 @@ public Type handleVerification(FileAttributes attributes) { return determineTypeFromConstraints(operation, locations, - readableLocations); + verifiedReadable); } public void setCompletionHandler( @@ -580,13 +645,26 @@ private Type determineTypeFromConstraints(FileOperation operation, StorageUnitConstraints constraints = poolInfoMap.getStorageUnitConstraints(sindex); + + Set excluded = poolInfoMap.getExcludedLocationNames(locations); + readableLocations = Sets.difference(readableLocations, excluded); + + int required = constraints.getRequired(); + int missing = required - readableLocations.size(); + /* - * Countable means readable OR intentionally excluded locations. - * If there are copies missing only from excluded locations, - * do nothing. + * First compute the missing files on the basis of just the readable + * files. If this is positive, recompute by adding in all the + * excluded locations. If these satisfy the requirement, void + * the operation. Do no allow removes in this case, since this + * would imply decreasing already deficient locations. */ - int missing = constraints.getRequired() - - poolInfoMap.getCountableLocations(locations); + if (missing > 0) { + missing -= excluded.size(); + if (missing < 0) { + missing = 0; + } + } Collection tags = constraints.getOneCopyPer(); @@ -714,4 +792,53 @@ private boolean shouldEvictALocation(FileOperation operation, return false; } + + /** + *

Called when there are no accessible replicas for the file.

+ * + *

If the file's RetentionPolicy is CUSTODIAL, set the count to 1, + * to make sure the task completes after this. Staging is fire-and-forget, + * and depends on a new add cache location message being processed + * after staging.

+ * + * @return true if file is CUSTODIAL, false otherwise. + */ + private boolean shouldTryToStage(FileAttributes attributes, + FileOperation operation) { + if (attributes.getRetentionPolicy() == RetentionPolicy.CUSTODIAL) { + LOGGER.trace("shouldTryToStage {}, retention policy is CUSTODIAL.", + operation.getPnfsId()); + operation.setOpCount(1); + return true; + } + LOGGER.trace("shouldTryToStage {}, retention policy is not CUSTODIAL", + operation.getPnfsId()); + return false; + } + + /* + * REVISIT -- this will be expanded into a more general verification procedure + * + * Check the readable pools for the actual existence of the replica. + */ + private Set verifyLocations(PnfsId pnfsId, + Collection locations, + CellStub stub) + throws InterruptedException { + SpreadAndWait controller = new SpreadAndWait<>(pools); + + for(String pool: locations) { + LOGGER.trace("Sending query to {} to verify replica exists.", pool); + PoolCheckFileMessage request = new PoolCheckFileMessage(pool, pnfsId); + controller.send(new CellPath(pool), PoolCheckFileMessage.class, request); + } + + controller.waitForReplies(); + + return controller.getReplies().values() + .stream() + .filter(PoolCheckFileMessage::getHave) + .map(PoolCheckFileMessage::getPoolName) + .collect(Collectors.toSet()); + } } diff --git a/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/PoolInfoChangeHandler.java b/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/PoolInfoChangeHandler.java index 0cf508c3513..701faea689f 100644 --- a/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/PoolInfoChangeHandler.java +++ b/modules/dcache-resilience/src/main/java/org/dcache/resilience/handlers/PoolInfoChangeHandler.java @@ -316,7 +316,7 @@ private void checkLastRefresh() { String initError = String.format(SYNC_ALARM, new Date(lastRefresh), refreshTimeout, refreshTimeoutUnit); LOGGER.error(AlarmMarkerFactory.getMarker( - PredefinedAlarm.RESILIENCE_SYNC_FAILURE, + PredefinedAlarm.RESILIENCE_PM_SYNC_FAILURE, "resilience", String.valueOf(lastRefresh)), initError); } diff --git a/modules/dcache-resilience/src/test/java/org/dcache/resilience/TestStub.java b/modules/dcache-resilience/src/test/java/org/dcache/resilience/TestStub.java index b2828842d98..a5a905e1fb6 100644 --- a/modules/dcache-resilience/src/test/java/org/dcache/resilience/TestStub.java +++ b/modules/dcache-resilience/src/test/java/org/dcache/resilience/TestStub.java @@ -64,7 +64,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.io.Serializable; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.Executor; import diskCacheV111.util.CacheException; @@ -139,6 +138,20 @@ public ListenableFuture send(final T message, CellEndpoin return future; } + public ListenableFuture send(CellPath destination, + Serializable message, + Class type, + long timeout, + CellEndpoint.SendFlag... flags) { + ListenableFutureTask future; + future = ListenableFutureTask.create(() -> { + processor.processMessage((Message)message); + return type.cast(message); + }); + future.run(); + return future; + } + public void setProcessor(TestMessageProcessor processor) { this.processor = processor; } diff --git a/modules/dcache-resilience/src/test/java/org/dcache/resilience/data/FileOperationMapTest.java b/modules/dcache-resilience/src/test/java/org/dcache/resilience/data/FileOperationMapTest.java index 99613a347a2..b16bc91c70f 100644 --- a/modules/dcache-resilience/src/test/java/org/dcache/resilience/data/FileOperationMapTest.java +++ b/modules/dcache-resilience/src/test/java/org/dcache/resilience/data/FileOperationMapTest.java @@ -68,22 +68,28 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import diskCacheV111.util.CacheException; import diskCacheV111.util.PnfsId; +import diskCacheV111.vehicles.PoolCheckFileMessage; + import org.dcache.resilience.TestBase; +import org.dcache.resilience.TestMessageProcessor; import org.dcache.resilience.TestSynchronousExecutor.Mode; import org.dcache.resilience.handlers.PoolTaskCompletionHandler; import org.dcache.vehicles.FileAttributes; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; +import static org.junit.Assert.*; -public final class FileOperationMapTest extends TestBase { +public final class FileOperationMapTest extends TestBase + implements TestMessageProcessor { PnfsId pnfsId; FileAttributes attributes; FileOperation operation; File checkpoint = new File("checkpoint"); + @Override + public void processMessage(PoolCheckFileMessage message) { + message.setHave(true); + } + @Before public void setUp() throws CacheException, InterruptedException { setUpBase(); @@ -92,6 +98,7 @@ public void setUp() throws CacheException, InterruptedException { createFileOperationHandler(); createInaccessibleFileHandler(); createFileOperationMap(); + setPoolMessageProcessor(this); poolTaskCompletionHandler = new PoolTaskCompletionHandler(); poolTaskCompletionHandler.setMap(poolOperationMap); wireFileOperationMap(); diff --git a/modules/dcache-resilience/src/test/java/org/dcache/resilience/handlers/FileOperationHandlerTest.java b/modules/dcache-resilience/src/test/java/org/dcache/resilience/handlers/FileOperationHandlerTest.java index f8ca67504d1..4367b733eef 100644 --- a/modules/dcache-resilience/src/test/java/org/dcache/resilience/handlers/FileOperationHandlerTest.java +++ b/modules/dcache-resilience/src/test/java/org/dcache/resilience/handlers/FileOperationHandlerTest.java @@ -70,6 +70,8 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import diskCacheV111.util.CacheException; import diskCacheV111.util.PnfsId; import diskCacheV111.vehicles.Message; +import diskCacheV111.vehicles.PoolCheckFileMessage; + import org.dcache.pool.migration.Task; import org.dcache.resilience.TestBase; import org.dcache.resilience.TestMessageProcessor; @@ -85,11 +87,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import org.dcache.vehicles.FileAttributes; import org.dcache.vehicles.resilience.RemoveReplicaMessage; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public final class FileOperationHandlerTest extends TestBase implements TestMessageProcessor { @@ -111,6 +109,9 @@ public void processMessage(Message message) throws Exception { } repRmMessage = (RemoveReplicaMessage) message; + } else if (message instanceof PoolCheckFileMessage) { + PoolCheckFileMessage reply = (PoolCheckFileMessage) message; + reply.setHave(true); } }