From 4402f3f8557527d5c6cdad6f5bdcbd707b8cbf52 Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Wed, 7 Feb 2018 02:28:23 -0800 Subject: [PATCH] HDFS-13097: [SPS]: Fix the branch review comments(Part1). Contributed by Surendra Singh. --- .../org/apache/hadoop/hdfs/DFSClient.java | 4 +- .../hadoop/hdfs/protocol/ClientProtocol.java | 6 +- .../ClientNamenodeProtocolTranslatorPB.java | 14 +- .../main/proto/ClientNamenodeProtocol.proto | 8 +- .../federation/router/RouterRpcServer.java | 2 +- .../java/org/apache/hadoop/hdfs/DFSUtil.java | 61 --- ...amenodeProtocolServerSideTranslatorPB.java | 16 +- .../server/blockmanagement/BlockManager.java | 255 +---------- .../blockmanagement/DatanodeDescriptor.java | 33 +- .../server/common/HdfsServerConstants.java | 2 +- .../datanode/StoragePolicySatisfyWorker.java | 15 +- .../hadoop/hdfs/server/mover/Mover.java | 2 +- .../namenode/FSDirSatisfyStoragePolicyOp.java | 26 +- .../namenode/FSDirStatAndListingOp.java | 1 - .../hdfs/server/namenode/FSDirXAttrOp.java | 2 +- .../hdfs/server/namenode/FSDirectory.java | 2 +- .../hdfs/server/namenode/FSNamesystem.java | 46 +- .../hadoop/hdfs/server/namenode/NameNode.java | 30 +- .../server/namenode/NameNodeRpcServer.java | 21 +- .../sps/BlockStorageMovementNeeded.java | 4 +- .../namenode/sps/IntraSPSNameNodeContext.java | 6 +- .../hdfs/server/namenode/sps/SPSPathIds.java | 70 --- .../hdfs/server/namenode/sps/SPSService.java | 10 +- .../namenode/sps/StoragePolicySatisfier.java | 137 ++++-- .../sps/StoragePolicySatisfyManager.java | 399 ++++++++++++++++++ .../sps/ExternalStoragePolicySatisfier.java | 2 +- .../hadoop/hdfs/tools/StoragePolicyAdmin.java | 2 +- .../namenode/TestNameNodeReconfigure.java | 19 +- .../TestPersistentStoragePolicySatisfier.java | 3 +- .../TestStoragePolicySatisfierWithHA.java | 6 +- .../sps/TestStoragePolicySatisfier.java | 35 +- ...StoragePolicySatisfierWithStripedFile.java | 6 +- .../TestExternalStoragePolicySatisfier.java | 24 +- 33 files changed, 665 insertions(+), 604 deletions(-) delete mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 471ab2c890046..b6f9bdd61db21 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -3110,8 +3110,8 @@ public void satisfyStoragePolicy(String src) throws IOException { } } - public boolean isStoragePolicySatisfierRunning() throws IOException { - return namenode.isStoragePolicySatisfierRunning(); + public boolean isInternalSatisfierRunning() throws IOException { + return namenode.isInternalSatisfierRunning(); } Tracer getTracer() { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 360fd63ae3f40..5c51c225f074f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -1759,12 +1759,12 @@ BatchedEntries listOpenFiles(long prevId, void satisfyStoragePolicy(String path) throws IOException; /** - * Check if StoragePolicySatisfier is running. - * @return true if StoragePolicySatisfier is running + * Check if internal StoragePolicySatisfier is running. + * @return true if internal StoragePolicySatisfier is running * @throws IOException */ @Idempotent - boolean isStoragePolicySatisfierRunning() throws IOException; + boolean isInternalSatisfierRunning() throws IOException; /** * Check the storage policy satisfy status of the path for which diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index cdc8eac5edcad..683cccad79400 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -150,8 +150,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto; @@ -301,8 +301,8 @@ public class ClientNamenodeProtocolTranslatorPB implements private final static GetErasureCodingCodecsRequestProto VOID_GET_EC_CODEC_REQUEST = GetErasureCodingCodecsRequestProto .newBuilder().build(); - private final static IsStoragePolicySatisfierRunningRequestProto - VOID_IS_SPS_RUNNING_REQUEST = IsStoragePolicySatisfierRunningRequestProto + private final static IsInternalSatisfierRunningRequestProto + VOID_IS_SPS_RUNNING_REQUEST = IsInternalSatisfierRunningRequestProto .newBuilder().build(); @@ -1912,10 +1912,10 @@ public ErasureCodingPolicy getErasureCodingPolicy(String src) } @Override - public boolean isStoragePolicySatisfierRunning() throws IOException { + public boolean isInternalSatisfierRunning() throws IOException { try { - IsStoragePolicySatisfierRunningResponseProto rep = - rpcProxy.isStoragePolicySatisfierRunning(null, + IsInternalSatisfierRunningResponseProto rep = + rpcProxy.isInternalSatisfierRunning(null, VOID_IS_SPS_RUNNING_REQUEST); return rep.getRunning(); } catch (ServiceException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index 933a19a310314..e8e3a5857980a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -838,10 +838,10 @@ message SatisfyStoragePolicyResponseProto { } -message IsStoragePolicySatisfierRunningRequestProto { // no parameters +message IsInternalSatisfierRunningRequestProto { // no parameters } -message IsStoragePolicySatisfierRunningResponseProto { +message IsInternalSatisfierRunningResponseProto { required bool running = 1; } @@ -1048,8 +1048,8 @@ service ClientNamenodeProtocol { returns(ListOpenFilesResponseProto); rpc satisfyStoragePolicy(SatisfyStoragePolicyRequestProto) returns(SatisfyStoragePolicyResponseProto); - rpc isStoragePolicySatisfierRunning(IsStoragePolicySatisfierRunningRequestProto) - returns(IsStoragePolicySatisfierRunningResponseProto); + rpc isInternalSatisfierRunning(IsInternalSatisfierRunningRequestProto) + returns(IsInternalSatisfierRunningResponseProto); rpc checkStoragePolicySatisfyPathStatus(CheckStoragePolicySatisfyPathStatusRequestProto) returns(CheckStoragePolicySatisfyPathStatusResponseProto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index c5458f068f642..d93f99deb60af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -2498,7 +2498,7 @@ public void satisfyStoragePolicy(String path) throws IOException { } @Override - public boolean isStoragePolicySatisfierRunning() throws IOException { + public boolean isInternalSatisfierRunning() throws IOException { checkOperation(OperationCategory.READ, false); return false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index c26599cd878dc..23f047821475d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -53,7 +53,6 @@ import java.util.Comparator; import java.util.Date; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -74,7 +73,6 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; @@ -1458,26 +1456,6 @@ public static int getReplWorkMultiplier(Configuration conf) { return blocksReplWorkMultiplier; } - /** - * Get DFS_SPS_WORK_MULTIPLIER_PER_ITERATION from - * configuration. - * - * @param conf Configuration - * @return Value of DFS_SPS_WORK_MULTIPLIER_PER_ITERATION - */ - public static int getSPSWorkMultiplier(Configuration conf) { - int spsWorkMultiplier = conf - .getInt( - DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION, - DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT); - Preconditions.checkArgument( - (spsWorkMultiplier > 0), - DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION + - " = '" + spsWorkMultiplier + "' is invalid. " + - "It should be a positive, non-zero integer value."); - return spsWorkMultiplier; - } - /** * Get SPNEGO keytab Key from configuration * @@ -1738,43 +1716,4 @@ public static DelegationTokenIdentifier decodeDelegationToken( } return id; } - - /** - * Remove the overlap between the expected types and the existing types. - * - * @param expected - * - Expected storage types list. - * @param existing - * - Existing storage types list. - * @param ignoreNonMovable - * ignore non-movable storage types by removing them from both - * expected and existing storage type list to prevent non-movable - * storage from being moved. - * @returns if the existing types or the expected types is empty after - * removing the overlap. - */ - public static boolean removeOverlapBetweenStorageTypes( - List expected, - List existing, boolean ignoreNonMovable) { - for (Iterator i = existing.iterator(); i.hasNext();) { - final StorageType t = i.next(); - if (expected.remove(t)) { - i.remove(); - } - } - if (ignoreNonMovable) { - removeNonMovable(existing); - removeNonMovable(expected); - } - return expected.isEmpty() || existing.isEmpty(); - } - - private static void removeNonMovable(List types) { - for (Iterator i = types.iterator(); i.hasNext();) { - final StorageType t = i.next(); - if (!t.isMovable()) { - i.remove(); - } - } - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 09f7ce29a6890..b0816cb18a046 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -162,8 +162,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeStatusRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeStatusResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto; @@ -1865,14 +1865,14 @@ public GetErasureCodingPolicyResponseProto getErasureCodingPolicy(RpcController } @Override - public IsStoragePolicySatisfierRunningResponseProto - isStoragePolicySatisfierRunning(RpcController controller, - IsStoragePolicySatisfierRunningRequestProto req) + public IsInternalSatisfierRunningResponseProto + isInternalSatisfierRunning(RpcController controller, + IsInternalSatisfierRunningRequestProto req) throws ServiceException { try { - boolean ret = server.isStoragePolicySatisfierRunning(); - IsStoragePolicySatisfierRunningResponseProto.Builder builder = - IsStoragePolicySatisfierRunningResponseProto.newBuilder(); + boolean ret = server.isInternalSatisfierRunning(); + IsInternalSatisfierRunningResponseProto.Builder builder = + IsInternalSatisfierRunningResponseProto.newBuilder(); builder.setRunning(ret); return builder.build(); } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 92059105f98ca..e7979b44039d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -69,8 +69,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; @@ -94,12 +92,7 @@ import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; -import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler; -import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext; -import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeFileIdCollector; -import org.apache.hadoop.hdfs.server.namenode.sps.SPSPathIds; -import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; -import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; +import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; @@ -435,11 +428,7 @@ public long getTotalECBlockGroups() { private final BlockIdManager blockIdManager; /** For satisfying block storage policies. */ - private final StoragePolicySatisfier sps; - private final boolean storagePolicyEnabled; - private StoragePolicySatisfierMode spsMode; - private SPSPathIds spsPaths; - private final int spsOutstandingPathsLimit; + private final StoragePolicySatisfyManager spsManager; /** Minimum live replicas needed for the datanode to be transitioned * from ENTERING_MAINTENANCE to IN_MAINTENANCE. @@ -479,19 +468,10 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled, DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L); - // StoragePolicySatisfier(SPS) configs - storagePolicyEnabled = - conf.getBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, - DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT); - String spsModeVal = conf.get( - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT); - spsOutstandingPathsLimit = conf.getInt( - DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY, - DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT); - spsMode = StoragePolicySatisfierMode.fromString(spsModeVal); - spsPaths = new SPSPathIds(); - sps = new StoragePolicySatisfier(conf); + + // sps manager manages the user invoked sps paths and does the movement. + spsManager = new StoragePolicySatisfyManager(conf, namesystem, this); + blockTokenSecretManager = createBlockTokenSecretManager(conf); providedStorageMap = new ProvidedStorageMap(namesystem, this, conf); @@ -719,7 +699,7 @@ public void activate(Configuration conf, long blockTotal) { } public void close() { - stopSPS(false); + getSPSManager().stop(); bmSafeMode.close(); try { redundancyThread.interrupt(); @@ -733,7 +713,7 @@ public void close() { datanodeManager.close(); pendingReconstruction.stop(); blocksMap.close(); - stopSPSGracefully(); + getSPSManager().stopGracefully(); } /** @return the datanodeManager */ @@ -5046,222 +5026,9 @@ public boolean hasLowRedundancyBlocks(long inodeID) { } /** - * Gets the storage policy satisfier instance. - * - * @return sps - */ - public StoragePolicySatisfier getStoragePolicySatisfier() { - return sps; - } - - /** - * Start storage policy satisfier service. - */ - public void startSPS() { - if (!(storagePolicyEnabled && spsMode != StoragePolicySatisfierMode.NONE)) { - LOG.info( - "Failed to start StoragePolicySatisfier " - + " as {} set to {} and {} set to {}.", - DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled, - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, spsMode); - return; - } else if (sps.isRunning()) { - LOG.info("Storage policy satisfier is already running" - + " as internal service."); - return; - } - // starting internal SPS service - if (spsMode == StoragePolicySatisfierMode.INTERNAL) { - sps.start(false, spsMode); - } - } - - /** - * Stop storage policy satisfier service. - * - * @param forceStop - * true represents that it should stop SPS service by clearing all - * pending SPS work - */ - public void stopSPS(boolean forceStop) { - if (!(storagePolicyEnabled - && (spsMode != StoragePolicySatisfierMode.NONE))) { - LOG.info("Storage policy satisfier is not enabled."); - return; - } else if (!sps.isRunning()) { - removeAllSPSPathIds(); - LOG.info("Storage policy satisfier is not running."); - return; - } - - sps.disable(forceStop); - } - - /** - * Enable storage policy satisfier by starting its service. - */ - public void enableInternalSPS() { - if (!storagePolicyEnabled){ - LOG.info("Failed to start StoragePolicySatisfier as {} set to {}.", - DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled); - return; - } - if (sps.isRunning()) { - LOG.info("Storage policy satisfier is already running as SPS mode:{}.", - spsMode); - return; - } - updateSPSMode(StoragePolicySatisfierMode.INTERNAL); - sps.init(new IntraSPSNameNodeContext(this.namesystem, this, sps), - new IntraSPSNameNodeFileIdCollector(this.namesystem.getFSDirectory(), - sps), - new IntraSPSNameNodeBlockMoveTaskHandler(this, this.namesystem), null); - sps.start(true, spsMode); - } - - - - /** - * Enable storage policy satisfier by starting its service. - */ - public void enableExternalSPS() { - if (!storagePolicyEnabled){ - LOG.info("Failed to start StoragePolicySatisfier as {} set to {}.", - DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled); - return; - } - if (spsMode == StoragePolicySatisfierMode.EXTERNAL) { - LOG.info("Storage policy satisfier is already enabled as SPS mode:{}.", - spsMode); - return; - } - updateSPSMode(StoragePolicySatisfierMode.EXTERNAL); - sps.stopGracefully(); - } - - private void updateSPSMode(StoragePolicySatisfierMode newSpsMode) { - LOG.debug("Updating SPS service status, current mode:{}, new mode:{}", - spsMode, newSpsMode); - spsMode = newSpsMode; - } - - /** - * Disable the storage policy satisfier by stopping its services. - */ - public void disableSPS() { - switch (spsMode) { - case NONE: - break; - case INTERNAL: - case EXTERNAL: - if (!sps.isRunning()) { - LOG.info("Storage policy satisfier is already stopped."); - } else { - LOG.info("Stopping StoragePolicySatisfier mode {}, as admin " - + "requested to stop it.", spsMode); - sps.disable(true); - } - removeAllSPSPathIds(); - break; - default: - // nothing - break; - } - updateSPSMode(StoragePolicySatisfierMode.NONE); - } - - /** - * Timed wait to stop storage policy satisfier daemon threads. - */ - public void stopSPSGracefully() { - removeAllSPSPathIds(); - sps.stopGracefully(); - } - /** - * @return True if storage policy satisfier running. - */ - public boolean isStoragePolicySatisfierRunning() { - return sps.isRunning(); - } - - /** - * @return status - * Storage policy satisfy status of the path. - * @throws IOException - */ - public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus( - String path) throws IOException { - if (spsMode != StoragePolicySatisfierMode.INTERNAL) { - LOG.debug("Satisfier is not running inside namenode, so status " - + "can't be returned."); - throw new IOException("Satisfier is not running inside namenode, " - + "so status can't be returned."); - } - return sps.checkStoragePolicySatisfyPathStatus(path); - } - - /** - * @return SPS service instance. - */ - public SPSService getSPSService() { - return this.sps; - } - - /** - * @return the next SPS path id, on which path users has invoked to satisfy - * storages. - */ - public Long getNextSPSPathId() { - return spsPaths.pollNext(); - } - - /** - * Verify that satisfier queue limit exceeds allowed outstanding limit. - */ - public void verifyOutstandingSPSPathQLimit() throws IOException { - long size = spsPaths.size(); - // Checking that the SPS call Q exceeds the allowed limit. - if (spsOutstandingPathsLimit - size <= 0) { - LOG.debug("Satisifer Q - outstanding limit:{}, current size:{}", - spsOutstandingPathsLimit, size); - throw new IOException("Outstanding satisfier queue limit: " - + spsOutstandingPathsLimit + " exceeded, try later!"); - } - } - - /** - * Removes the SPS path id from the list of sps paths. - */ - public void removeSPSPathId(long trackId) { - spsPaths.remove(trackId); - } - - /** - * Clean up all sps path ids. - */ - public void removeAllSPSPathIds() { - spsPaths.clear(); - } - - /** - * Adds the sps path to SPSPathIds list. - */ - public void addSPSPathId(long id) { - spsPaths.add(id); - } - - /** - * @return true if sps is running as an internal service or external service. - */ - public boolean isSPSEnabled() { - return spsMode == StoragePolicySatisfierMode.INTERNAL - || spsMode == StoragePolicySatisfierMode.EXTERNAL; - } - - /** - * @return sps service mode. + * @return sps manager. */ - public StoragePolicySatisfierMode getSPSMode() { - return spsMode; + public StoragePolicySatisfyManager getSPSManager() { + return spsManager; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index b09d90862797c..24b948c6b062d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -211,8 +211,8 @@ public Type getType() { * A queue of blocks corresponding to trackID for moving its storage * placements by this datanode. */ - private final Queue storageMovementBlocks = - new LinkedList<>(); + private final BlockQueue storageMovementBlocks = + new BlockQueue<>(); private volatile boolean dropSPSWork = false; /* Variables for maintaining number of blocks scheduled to be written to @@ -369,6 +369,7 @@ public void clearBlockQueues() { this.pendingCached.clear(); this.cached.clear(); this.pendingUncached.clear(); + this.storageMovementBlocks.clear(); } public int numBlocks() { @@ -1082,9 +1083,10 @@ public boolean hasStorageType(StorageType type) { * - storage mismatched block info */ public void addBlocksToMoveStorage(BlockMovingInfo blkMovingInfo) { - synchronized (storageMovementBlocks) { - storageMovementBlocks.offer(blkMovingInfo); - } + storageMovementBlocks.offer(blkMovingInfo); + BlockManager.LOG + .debug("Adding block move task " + blkMovingInfo + " to " + getName() + + ", current queue size is " + storageMovementBlocks.size()); } /** @@ -1101,23 +1103,18 @@ public int getNumberOfBlocksToMoveStorages() { * total number of blocks which will be send to this datanode for * block movement. * - * @return block infos which needs to move its storage locations. + * @return block infos which needs to move its storage locations or null if + * there is no block infos to move. */ public BlockMovingInfo[] getBlocksToMoveStorages(int numBlocksToMoveTasks) { - synchronized (storageMovementBlocks) { - List blockMovingInfos = new ArrayList<>(); - for (; !storageMovementBlocks.isEmpty() - && numBlocksToMoveTasks > 0; numBlocksToMoveTasks--) { - blockMovingInfos.add(storageMovementBlocks.poll()); - } - BlockMovingInfo[] blkMoveArray = new BlockMovingInfo[blockMovingInfos - .size()]; - blkMoveArray = blockMovingInfos.toArray(blkMoveArray); - if (blkMoveArray.length > 0) { - return blkMoveArray; - } + List blockMovingInfos = storageMovementBlocks + .poll(numBlocksToMoveTasks); + if (blockMovingInfos == null || blockMovingInfos.size() <= 0) { return null; } + BlockMovingInfo[] blkMoveArray = new BlockMovingInfo[blockMovingInfos + .size()]; + return blockMovingInfos.toArray(blkMoveArray); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java index 1378de2575f88..c6e2263af9264 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java @@ -365,7 +365,7 @@ enum BlockUCState { String XATTR_ERASURECODING_POLICY = "system.hdfs.erasurecoding.policy"; - String XATTR_SATISFY_STORAGE_POLICY = "user.hdfs.sps.xattr"; + String XATTR_SATISFY_STORAGE_POLICY = "user.hdfs.sps"; Path MOVER_ID_PATH = new Path("/system/mover.id"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java index 42f2e9365d99a..af6137cb2f66b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java @@ -75,9 +75,8 @@ public class StoragePolicySatisfyWorker { public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) { this.datanode = datanode; - - moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, - DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT); + // Defaulting to 10. This is to minimise the number of move ops. + moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, 10); moveExecutor = initializeBlockMoverThreadPool(moverThreads); moverCompletionService = new ExecutorCompletionService<>(moveExecutor); handler = new BlocksMovementsStatusHandler(); @@ -127,21 +126,13 @@ private ThreadPoolExecutor initializeBlockMoverThreadPool(int num) { TimeUnit.SECONDS, new SynchronousQueue(), new Daemon.DaemonFactory() { private final AtomicInteger threadIndex = new AtomicInteger(0); + @Override public Thread newThread(Runnable r) { Thread t = super.newThread(r); t.setName("BlockMoverTask-" + threadIndex.getAndIncrement()); return t; } - }, new ThreadPoolExecutor.CallerRunsPolicy() { - @Override - public void rejectedExecution(Runnable runnable, - ThreadPoolExecutor e) { - LOG.info("Execution for block movement to satisfy storage policy" - + " got rejected, Executing in current thread"); - // will run in the current thread. - super.rejectedExecution(runnable, e); - } }); moverThreadPool.allowCoreThreadTimeOut(true); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index 2cc0e271b63e5..af5ab2d36cd27 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -661,7 +661,7 @@ static int run(Map> namenodes, Configuration conf) boolean spsRunning; try { spsRunning = nnc.getDistributedFileSystem().getClient() - .isStoragePolicySatisfierRunning(); + .isInternalSatisfierRunning(); } catch (RemoteException e) { IOException cause = e.unwrapRemoteException(); if (cause instanceof StandbyException) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java index 5ffd6e8205ce0..45d621857cad5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; import java.io.IOException; +import java.util.Arrays; import java.util.EnumSet; import java.util.List; @@ -75,24 +76,33 @@ static FileStatus satisfyStoragePolicy(FSDirectory fsd, BlockManager bm, fsd.checkPathAccess(pc, iip, FsAction.WRITE); } INode inode = FSDirectory.resolveLastINode(iip); - if (inodeHasSatisfyXAttr(inode)) { - throw new IOException( - "Cannot request to call satisfy storage policy on path " + if (inode.isFile() && inode.asFile().numBlocks() == 0) { + if (NameNode.LOG.isInfoEnabled()) { + NameNode.LOG.info( + "Skipping satisfy storage policy on path:{} as " + + "this file doesn't have any blocks!", + inode.getFullPathName()); + } + } else if (inodeHasSatisfyXAttr(inode)) { + NameNode.LOG + .warn("Cannot request to call satisfy storage policy on path: " + inode.getFullPathName() + ", as this file/dir was already called for satisfying " + "storage policy."); - } - if (unprotectedSatisfyStoragePolicy(inode, fsd)) { + } else { XAttr satisfyXAttr = XAttrHelper .buildXAttr(XATTR_SATISFY_STORAGE_POLICY); - List xAttrs = Lists.newArrayListWithCapacity(1); - xAttrs.add(satisfyXAttr); + List xAttrs = Arrays.asList(satisfyXAttr); List existingXAttrs = XAttrStorage.readINodeXAttrs(inode); List newXAttrs = FSDirXAttrOp.setINodeXAttrs(fsd, existingXAttrs, xAttrs, EnumSet.of(XAttrSetFlag.CREATE)); XAttrStorage.updateINodeXAttrs(inode, newXAttrs, iip.getLatestSnapshotId()); fsd.getEditLog().logSetXAttrs(src, xAttrs, logRetryCache); + + // Adding directory in the pending queue, so FileInodeIdCollector + // process directory child in batch and recursively + fsd.getBlockManager().getSPSManager().addPathId(inode.getId()); } } finally { fsd.writeUnlock(); @@ -106,7 +116,7 @@ static boolean unprotectedSatisfyStoragePolicy(INode inode, FSDirectory fsd) { } else { // Adding directory in the pending queue, so FileInodeIdCollector process // directory child in batch and recursively - fsd.getBlockManager().addSPSPathId(inode.getId()); + fsd.getBlockManager().getSPSManager().addPathId(inode.getId()); return true; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java index 709e2705e1bad..7e22ae1c7cbc4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java @@ -90,7 +90,6 @@ static DirectoryListing getListingInt(FSDirectory fsd, FSPermissionChecker pc, * @param srcArg The string representation of the path to the file * @param resolveLink whether to throw UnresolvedLinkException * if src refers to a symlink - * @param needLocation if blockLocations need to be returned * * @param needLocation Include {@link LocatedBlocks} in result. * @param needBlockToken Include block tokens in {@link LocatedBlocks}. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java index 459e697b6b06a..1150a7287918d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java @@ -209,7 +209,7 @@ static List unprotectedRemoveXAttrs( for (XAttr xattr : toRemove) { if (XATTR_SATISFY_STORAGE_POLICY .equals(XAttrHelper.getPrefixedName(xattr))) { - fsd.getBlockManager().getStoragePolicySatisfier() + fsd.getBlockManager().getSPSManager().getInternalSPSService() .clearQueue(inode.getId()); break; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 2c9d627add1a4..6539b513690f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -1401,7 +1401,7 @@ public final void addToInodeMap(INode inode) { if (!inode.isSymlink()) { final XAttrFeature xaf = inode.getXAttrFeature(); addEncryptionZone((INodeWithAdditionalFields) inode, xaf); - if (namesystem.getBlockManager().isSPSEnabled()) { + if (namesystem.getBlockManager().getSPSManager().isEnabled()) { addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 5dfec2552e6a5..8c5a4109e7a1a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -259,10 +259,7 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager; -import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler; -import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext; -import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeFileIdCollector; -import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; +import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; @@ -1295,13 +1292,7 @@ void startActiveServices() throws IOException { FSDirEncryptionZoneOp.warmUpEdekCache(edekCacheLoader, dir, edekCacheLoaderDelay, edekCacheLoaderInterval); } - blockManager.getSPSService().init( - new IntraSPSNameNodeContext(this, blockManager, - blockManager.getSPSService()), - new IntraSPSNameNodeFileIdCollector(getFSDirectory(), - blockManager.getSPSService()), - new IntraSPSNameNodeBlockMoveTaskHandler(blockManager, this), null); - blockManager.startSPS(); + blockManager.getSPSManager().start(); } finally { startingActiveService = false; blockManager.checkSafeMode(); @@ -1332,7 +1323,7 @@ void stopActiveServices() { writeLock(); try { if (blockManager != null) { - blockManager.stopSPS(false); + blockManager.getSPSManager().stop(); } stopSecretManager(); leaseManager.stopMonitor(); @@ -1372,7 +1363,7 @@ void stopActiveServices() { // Don't want to keep replication queues when not in Active. blockManager.clearQueues(); blockManager.setInitializedReplQueues(false); - blockManager.stopSPSGracefully(); + blockManager.getSPSManager().stopGracefully(); } } finally { writeUnlock("stopActiveServices"); @@ -2281,17 +2272,18 @@ private void validateStoragePolicySatisfy() DFS_STORAGE_POLICY_ENABLED_KEY)); } // checks sps status - if (!blockManager.isSPSEnabled() - || (blockManager.getSPSMode() == StoragePolicySatisfierMode.INTERNAL - && !blockManager.getStoragePolicySatisfier().isRunning())) { + if (!blockManager.getSPSManager().isEnabled() || (blockManager + .getSPSManager().getMode() == StoragePolicySatisfierMode.INTERNAL + && !blockManager.getSPSManager().isInternalSatisfierRunning())) { throw new UnsupportedActionException( "Cannot request to satisfy storage policy " + "when storage policy satisfier feature has been disabled" + " by admin. Seek for an admin help to enable it " + "or use Mover tool."); } - // checks SPS Q has many outstanding requests. - blockManager.verifyOutstandingSPSPathQLimit(); + // checks SPS Q has many outstanding requests. It will throw IOException if + // the limit exceeds. + blockManager.getSPSManager().verifyOutstandingPathQLimit(); } /** @@ -3996,17 +3988,15 @@ nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed, } // Handle blocks movement results sent by the coordinator datanode. - StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier(); - if (sps != null) { - if (!sps.isRunning()) { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Storage policy satisfier is not running. So, ignoring storage" - + " movement attempt finished block info sent by DN"); - } - } else { - sps.notifyStorageMovementAttemptFinishedBlks(blksMovementsFinished); + SPSService sps = blockManager.getSPSManager().getInternalSPSService(); + if (!sps.isRunning()) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Storage policy satisfier is not running. So, ignoring storage" + + " movement attempt finished block info sent by DN"); } + } else { + sps.notifyStorageMovementAttemptFinishedBlks(blksMovementsFinished); } //create ha status diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index a7d829853bf93..4e3a3baf5d4ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -2043,7 +2043,7 @@ protected String reconfigurePropertyImpl(String property, String newVal) } else if (property.equals(ipcClientRPCBackoffEnable)) { return reconfigureIPCBackoffEnabled(newVal); } else if (property.equals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY)) { - return reconfigureSPSEnabled(newVal, property); + return reconfigureSPSModeEvent(newVal, property); } else { throw new ReconfigurationException(property, newVal, getConf().get( property)); @@ -2127,39 +2127,27 @@ String reconfigureIPCBackoffEnabled(String newVal) { return Boolean.toString(clientBackoffEnabled); } - String reconfigureSPSEnabled(String newVal, String property) + String reconfigureSPSModeEvent(String newVal, String property) throws ReconfigurationException { if (newVal == null || StoragePolicySatisfierMode.fromString(newVal) == null) { throw new ReconfigurationException(property, newVal, getConf().get(property), new HadoopIllegalArgumentException( - "For enabling or disabling storage policy satisfier, we must " - + "pass either none/internal/external string value only")); + "For enabling or disabling storage policy satisfier, must " + + "pass either internal/external/none string value only")); } if (!isActiveState()) { throw new ReconfigurationException(property, newVal, - getConf().get(property), new HadoopIllegalArgumentException( - "Enabling or disabling storage policy satisfier service on " - + state + " NameNode is not allowed")); + getConf().get(property), + new HadoopIllegalArgumentException( + "Enabling or disabling storage policy satisfier service on " + + state + " NameNode is not allowed")); } StoragePolicySatisfierMode mode = StoragePolicySatisfierMode .fromString(newVal); - switch(mode){ - case NONE: - namesystem.getBlockManager().disableSPS(); - break; - case INTERNAL: - namesystem.getBlockManager().enableInternalSPS(); - break; - case EXTERNAL: - namesystem.getBlockManager().enableExternalSPS(); - break; - default: - // nothing - break; - } + namesystem.getBlockManager().getSPSManager().changeModeEvent(mode); return newVal; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index d74dc9e0562b4..97f38c71686e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -2536,15 +2536,15 @@ public List listReconfigurableProperties() throws IOException { } @Override - public boolean isStoragePolicySatisfierRunning() throws IOException { + public boolean isInternalSatisfierRunning() throws IOException { checkNNStartup(); - String operationName = "isStoragePolicySatisfierRunning"; + String operationName = "isInternalSatisfierRunning"; namesystem.checkSuperuserPrivilege(operationName); if (nn.isStandbyState()) { throw new StandbyException("Not supported by Standby Namenode."); } - boolean isSPSRunning = - namesystem.getBlockManager().isStoragePolicySatisfierRunning(); + boolean isSPSRunning = namesystem.getBlockManager().getSPSManager() + .isInternalSatisfierRunning(); namesystem.logAuditEvent(true, operationName, null); return isSPSRunning; } @@ -2556,8 +2556,8 @@ public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus( if (nn.isStandbyState()) { throw new StandbyException("Not supported by Standby Namenode."); } - return namesystem.getBlockManager().checkStoragePolicySatisfyPathStatus( - path); + return namesystem.getBlockManager().getSPSManager() + .checkStoragePolicySatisfyPathStatus(path); } @Override @@ -2579,17 +2579,16 @@ public Long getNextSPSPathId() throws IOException { if (nn.isStandbyState()) { throw new StandbyException("Not supported by Standby Namenode."); } - // Check that internal SPS service is running - if (namesystem.getBlockManager() - .getSPSMode() == StoragePolicySatisfierMode.INTERNAL - && namesystem.getBlockManager().getSPSService().isRunning()) { + // Check that SPS daemon service is running inside namenode + if (namesystem.getBlockManager().getSPSManager() + .getMode() == StoragePolicySatisfierMode.INTERNAL) { LOG.debug("SPS service is internally enabled and running inside " + "namenode, so external SPS is not allowed to fetch the path Ids"); throw new IOException("SPS service is internally enabled and running" + " inside namenode, so external SPS is not allowed to fetch" + " the path Ids"); } - return namesystem.getBlockManager().getNextSPSPathId(); + return namesystem.getBlockManager().getSPSManager().getNextPathId(); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java index 8a10183c0865c..c683a639f740d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java @@ -311,7 +311,7 @@ public void run() { if (Time.monotonicNow() - lastStatusCleanTime > statusClearanceElapsedTimeMs) { lastStatusCleanTime = Time.monotonicNow(); - cleanSpsStatus(); + cleanSPSStatus(); } startINodeId = null; // Current inode id successfully scanned. } @@ -333,7 +333,7 @@ public void run() { } } - private synchronized void cleanSpsStatus() { + private synchronized void cleanSPSStatus() { for (Iterator> it = spsStatus.entrySet().iterator(); it.hasNext();) { Entry entry = it.next(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java index ff6cc21fa227b..495d1c4a334aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java @@ -178,17 +178,17 @@ public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type, @Override public Long getNextSPSPathId() { - return blockManager.getNextSPSPathId(); + return blockManager.getSPSManager().getNextPathId(); } @Override public void removeSPSPathId(long trackId) { - blockManager.removeSPSPathId(trackId); + blockManager.getSPSManager().removePathId(trackId); } @Override public void removeAllSPSPathIds() { - blockManager.removeAllSPSPathIds(); + blockManager.getSPSManager().removeAllPathIds(); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java deleted file mode 100644 index 6c0f8b2ea0cf3..0000000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode.sps; - -import java.util.LinkedList; -import java.util.Queue; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -/** - * A class which holds the SPS invoked path ids. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class SPSPathIds { - - // List of pending dir to satisfy the policy - private final Queue spsDirsToBeTraveresed = new LinkedList(); - - /** - * Add the path id to queue. - */ - public synchronized void add(long pathId) { - spsDirsToBeTraveresed.add(pathId); - } - - /** - * Removes the path id. - */ - public synchronized void remove(long pathId) { - spsDirsToBeTraveresed.remove(pathId); - } - - /** - * Clears all path ids. - */ - public synchronized void clear() { - spsDirsToBeTraveresed.clear(); - } - - /** - * @return next path id available in queue. - */ - public synchronized Long pollNext() { - return spsDirsToBeTraveresed.poll(); - } - - /** - * @return the size of the queue. - */ - public synchronized long size() { - return spsDirsToBeTraveresed.size(); - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java index ceec3f3bb148a..da6e365969893 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java @@ -67,11 +67,12 @@ void init(Context ctxt, FileIdCollector fileIDCollector, void stopGracefully(); /** - * Disable the SPS service. + * Stops the SPS service. * * @param forceStop + * true represents to clear all the sps path's hint, false otherwise. */ - void disable(boolean forceStop); + void stop(boolean forceStop); /** * Check whether StoragePolicySatisfier is running. @@ -105,6 +106,11 @@ void addAllFileIdsToProcess(long startId, List itemInfoList, */ int processingQueueSize(); + /** + * Clear inodeId present in the processing queue. + */ + void clearQueue(long inodeId); + /** * @return the configuration. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java index 87faced0604ec..6b449aa9973f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java @@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -60,6 +59,7 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; /** * Setting storagePolicy on a file after the file write will only update the new @@ -145,7 +145,7 @@ public void init(final Context context, final FileIdCollector fileIDCollector, new BlockStorageMovementAttemptedItems(this, storageMovementNeeded, blockMovementListener); this.blockMoveTaskHandler = blockMovementTaskHandler; - this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(getConf()); + this.spsWorkMultiplier = getSPSWorkMultiplier(getConf()); this.blockMovementMaxRetry = getConf().getInt( DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY, DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT); @@ -163,8 +163,6 @@ public synchronized void start(boolean reconfigStart, serviceMode); return; } - isRunning = true; - this.spsMode = serviceMode; if (spsMode == StoragePolicySatisfierMode.INTERNAL && ctxt.isMoverRunning()) { isRunning = false; @@ -182,6 +180,8 @@ public synchronized void start(boolean reconfigStart, StringUtils.toLowerCase(spsMode.toString())); } + isRunning = true; + this.spsMode = serviceMode; // Ensure that all the previously submitted block movements(if any) have to // be stopped in all datanodes. addDropSPSWorkCommandsToAllDNs(); @@ -193,7 +193,7 @@ public synchronized void start(boolean reconfigStart, } @Override - public synchronized void disable(boolean forceStop) { + public synchronized void stop(boolean forceStop) { isRunning = false; if (storagePolicySatisfierThread == null) { return; @@ -214,19 +214,22 @@ public synchronized void disable(boolean forceStop) { @Override public synchronized void stopGracefully() { if (isRunning) { - disable(true); + stop(false); } if (this.storageMovementsMonitor != null) { this.storageMovementsMonitor.stopGracefully(); } - if (storagePolicySatisfierThread == null) { - return; - } - try { - storagePolicySatisfierThread.join(3000); - } catch (InterruptedException ie) { + if (storagePolicySatisfierThread != null) { + try { + storagePolicySatisfierThread.join(3000); + } catch (InterruptedException ie) { + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted Exception while waiting to join sps thread," + + " ignoring it", ie); + } + } } } @@ -351,32 +354,26 @@ public void run() { Thread.sleep(3000); blockCount = 0L; } + } catch (IOException e) { + LOG.error("Exception during StoragePolicySatisfier execution - " + + "will continue next cycle", e); } catch (Throwable t) { - handleException(t); - } - } - } - - private void handleException(Throwable t) { - // double check to avoid entering into synchronized block. - if (isRunning) { - synchronized (this) { - if (isRunning) { - if (t instanceof InterruptedException) { + synchronized (this) { + if (isRunning) { isRunning = false; - LOG.info("Stopping StoragePolicySatisfier."); + if (t instanceof InterruptedException) { + LOG.info("Stopping StoragePolicySatisfier.", t); + } else { + LOG.error("StoragePolicySatisfier thread received " + + "runtime exception.", t); + } // Stopping monitor thread and clearing queues as well this.clearQueues(); this.storageMovementsMonitor.stopGracefully(); - } else { - LOG.error( - "StoragePolicySatisfier thread received runtime exception, " - + "ignoring", t); } } } } - return; } private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN( @@ -434,7 +431,7 @@ private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN( List existing = new LinkedList( Arrays.asList(blockInfo.getStorageTypes())); - if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes, + if (!removeOverlapBetweenStorageTypes(expectedStorageTypes, existing, true)) { boolean blocksPaired = computeBlockMovingInfos(blockMovingInfos, blockInfo, expectedStorageTypes, existing, blockInfo.getLocations(), @@ -499,7 +496,7 @@ private boolean computeBlockMovingInfos( DatanodeInfo[] storages, DatanodeStorageReport[] liveDns, ErasureCodingPolicy ecPolicy) { boolean foundMatchingTargetNodesForBlock = true; - if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes, + if (!removeOverlapBetweenStorageTypes(expectedStorageTypes, existing, true)) { List sourceWithStorageMap = new ArrayList(); @@ -880,21 +877,6 @@ public void clearQueues() { storageMovementNeeded.clearAll(); } - /** - * Set file inode in queue for which storage movement needed for its blocks. - * - * @param inodeId - * - file inode/blockcollection id. - */ - public void satisfyStoragePolicy(Long inodeId) { - //For file startId and trackId is same - storageMovementNeeded.add(new ItemInfo(inodeId, inodeId)); - if (LOG.isDebugEnabled()) { - LOG.debug("Added track info for inode {} to block " - + "storageMovementNeeded queue", inodeId); - } - } - /** * Clear queues for given track id. */ @@ -958,6 +940,10 @@ public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus( @Override public void addFileIdToProcess(ItemInfo trackInfo, boolean scanCompleted) { storageMovementNeeded.add(trackInfo, scanCompleted); + if (LOG.isDebugEnabled()) { + LOG.debug("Added track info for inode {} to block " + + "storageMovementNeeded queue", trackInfo.getFileId()); + } } @Override @@ -993,4 +979,63 @@ public void join() throws InterruptedException { //TODO Add join here on SPS rpc server also storagePolicySatisfierThread.join(); } + + /** + * Remove the overlap between the expected types and the existing types. + * + * @param expected + * - Expected storage types list. + * @param existing + * - Existing storage types list. + * @param ignoreNonMovable + * ignore non-movable storage types by removing them from both + * expected and existing storage type list to prevent non-movable + * storage from being moved. + * @returns if the existing types or the expected types is empty after + * removing the overlap. + */ + private static boolean removeOverlapBetweenStorageTypes( + List expected, + List existing, boolean ignoreNonMovable) { + for (Iterator i = existing.iterator(); i.hasNext();) { + final StorageType t = i.next(); + if (expected.remove(t)) { + i.remove(); + } + } + if (ignoreNonMovable) { + removeNonMovable(existing); + removeNonMovable(expected); + } + return expected.isEmpty() || existing.isEmpty(); + } + + private static void removeNonMovable(List types) { + for (Iterator i = types.iterator(); i.hasNext();) { + final StorageType t = i.next(); + if (!t.isMovable()) { + i.remove(); + } + } + } + + /** + * Get DFS_SPS_WORK_MULTIPLIER_PER_ITERATION from + * configuration. + * + * @param conf Configuration + * @return Value of DFS_SPS_WORK_MULTIPLIER_PER_ITERATION + */ + private static int getSPSWorkMultiplier(Configuration conf) { + int spsWorkMultiplier = conf + .getInt( + DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION, + DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT); + Preconditions.checkArgument( + (spsWorkMultiplier > 0), + DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION + + " = '" + spsWorkMultiplier + "' is invalid. " + + "It should be a positive, non-zero integer value."); + return spsWorkMultiplier; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java new file mode 100644 index 0000000000000..5bdf6aeaded5c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java @@ -0,0 +1,399 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.sps; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.namenode.Namesystem; +import org.apache.hadoop.hdfs.server.sps.ExternalStoragePolicySatisfier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This manages satisfy storage policy invoked path ids and expose methods to + * process these path ids. It maintains sps mode(INTERNAL/EXTERNAL/NONE) + * configured by the administrator. + * + *

+ * If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then + * it will start internal sps daemon service inside namenode and process sps + * invoked path ids to satisfy the storage policy. + * + *

+ * If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then + * it won't do anything, just maintains the sps invoked path ids. Administrator + * requires to start external sps service explicitly, to fetch the sps invoked + * path ids from namenode, then do necessary computations and block movement in + * order to satisfy the storage policy. Please refer + * {@link ExternalStoragePolicySatisfier} class to understand more about the + * external sps service functionality. + * + *

+ * If the configured mode is {@link StoragePolicySatisfierMode.NONE}, then it + * will disable the sps feature completely by clearing all queued up sps path's + * hint. + * + * This class is instantiated by the BlockManager. + */ +public class StoragePolicySatisfyManager { + private static final Logger LOG = LoggerFactory + .getLogger(StoragePolicySatisfyManager.class); + private final StoragePolicySatisfier spsService; + private final boolean storagePolicyEnabled; + private volatile StoragePolicySatisfierMode mode; + private final Queue pathsToBeTraveresed; + private final int outstandingPathsLimit; + private final Namesystem namesystem; + private final BlockManager blkMgr; + + public StoragePolicySatisfyManager(Configuration conf, Namesystem namesystem, + BlockManager blkMgr) { + // StoragePolicySatisfier(SPS) configs + storagePolicyEnabled = conf.getBoolean( + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT); + String modeVal = conf.get( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT); + outstandingPathsLimit = conf.getInt( + DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY, + DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT); + mode = StoragePolicySatisfierMode.fromString(modeVal); + pathsToBeTraveresed = new LinkedList(); + // instantiate SPS service by just keeps config reference and not starting + // any supporting threads. + spsService = new StoragePolicySatisfier(conf); + this.namesystem = namesystem; + this.blkMgr = blkMgr; + } + + /** + * This function will do following logic based on the configured sps mode: + * + *

+ * If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then + * starts internal daemon service inside namenode. + * + *

+ * If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then + * it won't do anything. Administrator requires to start external sps service + * explicitly. + * + *

+ * If the configured mode is {@link StoragePolicySatisfierMode.NONE}, then the + * service is disabled and won't do any action. + */ + public void start() { + if (!storagePolicyEnabled) { + LOG.info("Disabling StoragePolicySatisfier service as {} set to {}.", + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled); + return; + } + + switch (mode) { + case INTERNAL: + if (spsService.isRunning()) { + LOG.info("Storage policy satisfier is already running" + + " as internal daemon service inside namenode."); + return; + } + // starts internal daemon service inside namenode + spsService.init( + new IntraSPSNameNodeContext(namesystem, blkMgr, spsService), + new IntraSPSNameNodeFileIdCollector(namesystem.getFSDirectory(), + spsService), + new IntraSPSNameNodeBlockMoveTaskHandler(blkMgr, namesystem), null); + spsService.start(false, mode); + break; + case EXTERNAL: + LOG.info("Storage policy satisfier is configured as external, " + + "please start external sps service explicitly to satisfy policy"); + break; + case NONE: + LOG.info("Storage policy satisfier is disabled"); + break; + default: + LOG.info("Given mode: {} is invalid", mode); + break; + } + } + + /** + * This function will do following logic based on the configured sps mode: + * + *

+ * If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then + * stops internal daemon service inside namenode. + * + *

+ * If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then + * it won't do anything. Administrator requires to stop external sps service + * explicitly, if needed. + * + *

+ * If the configured mode is {@link StoragePolicySatisfierMode.NONE}, then the + * service is disabled and won't do any action. + */ + public void stop() { + if (!storagePolicyEnabled) { + if (LOG.isDebugEnabled()) { + LOG.debug("Storage policy is not enabled, ignoring"); + } + return; + } + + switch (mode) { + case INTERNAL: + removeAllPathIds(); + if (!spsService.isRunning()) { + LOG.info("Internal storage policy satisfier daemon service" + + " is not running"); + return; + } + // stops internal daemon service running inside namenode + spsService.stop(false); + break; + case EXTERNAL: + removeAllPathIds(); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Storage policy satisfier service is running outside namenode" + + ", ignoring"); + } + break; + case NONE: + if (LOG.isDebugEnabled()) { + LOG.debug("Storage policy satisfier is not enabled, ignoring"); + } + break; + default: + if (LOG.isDebugEnabled()) { + LOG.debug("Invalid mode:{}, ignoring", mode); + } + break; + } + } + + /** + * Sets new sps mode. If the new mode is internal, then it will start internal + * sps service inside namenode. If the new mode is external, then stops + * internal sps service running(if any) inside namenode. If the new mode is + * none, then it will disable the sps feature completely by clearing all + * queued up sps path's hint. + */ + public void changeModeEvent(StoragePolicySatisfierMode newMode) { + if (!storagePolicyEnabled) { + LOG.info("Failed to change storage policy satisfier as {} set to {}.", + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled); + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Updating SPS service status, current mode:{}, new mode:{}", + mode, newMode); + } + + switch (newMode) { + case INTERNAL: + if (spsService.isRunning()) { + LOG.info("Storage policy satisfier is already running as {} mode.", + mode); + return; + } + spsService.init( + new IntraSPSNameNodeContext(this.namesystem, this.blkMgr, spsService), + new IntraSPSNameNodeFileIdCollector(this.namesystem.getFSDirectory(), + spsService), + new IntraSPSNameNodeBlockMoveTaskHandler(this.blkMgr, + this.namesystem), + null); + spsService.start(true, newMode); + break; + case EXTERNAL: + if (mode == newMode) { + LOG.info("Storage policy satisfier is already in mode:{}," + + " so ignoring change mode event.", newMode); + return; + } + spsService.stopGracefully(); + break; + case NONE: + if (mode == newMode) { + LOG.info("Storage policy satisfier is already disabled, mode:{}" + + " so ignoring change mode event.", newMode); + return; + } + LOG.info("Disabling StoragePolicySatisfier, mode:{}", newMode); + spsService.stop(true); + removeAllPathIds(); + break; + default: + if (LOG.isDebugEnabled()) { + LOG.debug("Given mode: {} is invalid", newMode); + } + break; + } + + // update sps mode + mode = newMode; + } + + /** + * This function will do following logic based on the configured sps mode: + * + *

+ * If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then + * timed wait to stop internal storage policy satisfier daemon threads. + * + *

+ * If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then + * it won't do anything, just ignore it. + * + *

+ * If the configured mode is {@link StoragePolicySatisfierMode.NONE}, then the + * service is disabled. It won't do any action, just ignore it. + */ + public void stopGracefully() { + switch (mode) { + case INTERNAL: + spsService.stopGracefully(); + break; + case EXTERNAL: + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring, StoragePolicySatisfier feature is running" + + " outside namenode"); + } + break; + case NONE: + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring, StoragePolicySatisfier feature is disabled"); + } + break; + default: + if (LOG.isDebugEnabled()) { + LOG.debug("Invalid mode:{}", mode); + } + break; + } + } + + /** + * @return true if the internal storage policy satisfier daemon is running, + * false otherwise. + */ + public boolean isInternalSatisfierRunning() { + return spsService.isRunning(); + } + + /** + * @return internal SPS service instance. + */ + public SPSService getInternalSPSService() { + return this.spsService; + } + + /** + * @return status Storage policy satisfy status of the path. It is supported + * only for the internal sps daemon service. + * @throws IOException + * if the Satisfier is not running inside namenode. + */ + public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus( + String path) throws IOException { + if (mode != StoragePolicySatisfierMode.INTERNAL) { + LOG.debug("Satisfier is not running inside namenode, so status " + + "can't be returned."); + throw new IOException("Satisfier is not running inside namenode, " + + "so status can't be returned."); + } + return spsService.checkStoragePolicySatisfyPathStatus(path); + } + + /** + * @return the next SPS path id, on which path users has invoked to satisfy + * storages. + */ + public Long getNextPathId() { + synchronized (pathsToBeTraveresed) { + return pathsToBeTraveresed.poll(); + } + } + + /** + * Verify that satisfier queue limit exceeds allowed outstanding limit. + */ + public void verifyOutstandingPathQLimit() throws IOException { + long size = pathsToBeTraveresed.size(); + // Checking that the SPS call Q exceeds the allowed limit. + if (outstandingPathsLimit - size <= 0) { + LOG.debug("Satisifer Q - outstanding limit:{}, current size:{}", + outstandingPathsLimit, size); + throw new IOException("Outstanding satisfier queue limit: " + + outstandingPathsLimit + " exceeded, try later!"); + } + } + + /** + * Removes the SPS path id from the list of sps paths. + */ + public void removePathId(long trackId) { + synchronized (pathsToBeTraveresed) { + pathsToBeTraveresed.remove(trackId); + } + } + + /** + * Clean up all sps path ids. + */ + public void removeAllPathIds() { + synchronized (pathsToBeTraveresed) { + pathsToBeTraveresed.clear(); + } + } + + /** + * Adds the sps path to SPSPathIds list. + */ + public void addPathId(long id) { + synchronized (pathsToBeTraveresed) { + pathsToBeTraveresed.add(id); + } + } + + /** + * @return true if sps is configured as an internal service or external + * service, false otherwise. + */ + public boolean isEnabled() { + return mode == StoragePolicySatisfierMode.INTERNAL + || mode == StoragePolicySatisfierMode.EXTERNAL; + } + + /** + * @return sps service mode. + */ + public StoragePolicySatisfierMode getMode() { + return mode; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java index 59935b6234431..33448db6ff2a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java @@ -73,7 +73,7 @@ public static void main(String[] args) throws Exception { boolean spsRunning; spsRunning = nnc.getDistributedFileSystem().getClient() - .isStoragePolicySatisfierRunning(); + .isInternalSatisfierRunning(); if (spsRunning) { throw new RuntimeException( "Startup failed due to StoragePolicySatisfier" diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java index 3a2ad4894d411..d8392fa9e822a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java @@ -374,7 +374,7 @@ public int run(Configuration conf, List args) throws IOException { } final DistributedFileSystem dfs = AdminHelper.getDFS(conf); try { - if(dfs.getClient().isStoragePolicySatisfierRunning()){ + if(dfs.getClient().isInternalSatisfierRunning()){ System.out.println("yes"); }else{ System.out.println("no"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java index 85a101f4754d3..47ea39fbe4e6c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java @@ -252,8 +252,8 @@ public void testReconfigureSPSWithStoragePolicyDisabled() // Since DFS_STORAGE_POLICY_ENABLED_KEY is disabled, SPS can't be enabled. assertEquals("SPS shouldn't start as " + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY + " is disabled", false, - nameNode.getNamesystem().getBlockManager() - .isStoragePolicySatisfierRunning()); + nameNode.getNamesystem().getBlockManager().getSPSManager() + .isInternalSatisfierRunning()); verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, StoragePolicySatisfierMode.INTERNAL, false); @@ -280,8 +280,8 @@ public void testReconfigureStoragePolicySatisfierEnabled() fail("ReconfigurationException expected"); } catch (ReconfigurationException e) { GenericTestUtils.assertExceptionContains( - "For enabling or disabling storage policy satisfier, we must " - + "pass either none/internal/external string value only", + "For enabling or disabling storage policy satisfier, must " + + "pass either internal/external/none string value only", e.getCause()); } @@ -301,8 +301,8 @@ public void testReconfigureStoragePolicySatisfierEnabled() nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, StoragePolicySatisfierMode.EXTERNAL.toString()); assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value", - false, nameNode.getNamesystem().getBlockManager() - .isStoragePolicySatisfierRunning()); + false, nameNode.getNamesystem().getBlockManager().getSPSManager() + .isInternalSatisfierRunning()); assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value", StoragePolicySatisfierMode.EXTERNAL.toString(), nameNode.getConf().get(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, @@ -342,8 +342,8 @@ public void testSatisfyStoragePolicyAfterSatisfierDisabled() nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, StoragePolicySatisfierMode.INTERNAL.toString()); assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value", - true, nameNode.getNamesystem().getBlockManager() - .isStoragePolicySatisfierRunning()); + true, nameNode.getNamesystem().getBlockManager().getSPSManager() + .isInternalSatisfierRunning()); assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value", StoragePolicySatisfierMode.INTERNAL.toString(), nameNode.getConf().get(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, @@ -353,7 +353,8 @@ public void testSatisfyStoragePolicyAfterSatisfierDisabled() void verifySPSEnabled(final NameNode nameNode, String property, StoragePolicySatisfierMode expected, boolean isSatisfierRunning) { assertEquals(property + " has wrong value", isSatisfierRunning, nameNode - .getNamesystem().getBlockManager().isStoragePolicySatisfierRunning()); + .getNamesystem().getBlockManager().getSPSManager() + .isInternalSatisfierRunning()); String actual = nameNode.getConf().get(property, DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT); assertEquals(property + " has wrong value", expected, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java index b84214c26c408..9f98777233579 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java @@ -389,7 +389,8 @@ public void testDropSPS() throws Exception { fs.setStoragePolicy(testFile, ONE_SSD); fs.satisfyStoragePolicy(testFile); - cluster.getNamesystem().getBlockManager().disableSPS(); + cluster.getNamesystem().getBlockManager().getSPSManager() + .changeModeEvent(StoragePolicySatisfierMode.NONE); // Make sure satisfy xattr has been removed. DFSTestUtil.waitForXattrRemoved(testFileName, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java index e89cfa316e8f7..b3734d166604f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java @@ -103,7 +103,7 @@ public void testWhenNNHAStateChanges() throws IOException { dfs = cluster.getFileSystem(1); try { - dfs.getClient().isStoragePolicySatisfierRunning(); + dfs.getClient().isInternalSatisfierRunning(); Assert.fail("Call this function to Standby NN should " + "raise an exception."); } catch (RemoteException e) { @@ -115,14 +115,14 @@ public void testWhenNNHAStateChanges() throws IOException { cluster.transitionToActive(0); dfs = cluster.getFileSystem(0); - running = dfs.getClient().isStoragePolicySatisfierRunning(); + running = dfs.getClient().isInternalSatisfierRunning(); Assert.assertTrue("StoragePolicySatisfier should be active " + "when NN transits from Standby to Active mode.", running); // NN transits from Active to Standby cluster.transitionToStandby(0); try { - dfs.getClient().isStoragePolicySatisfierRunning(); + dfs.getClient().isInternalSatisfierRunning(); Assert.fail("NN in Standby again, call this function should " + "raise an exception."); } catch (RemoteException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java index 9e0a39fbb6491..6f7fe896902a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java @@ -445,15 +445,10 @@ public void testSatisfyWithExceptions() throws Exception { try { hdfsAdmin.satisfyStoragePolicy(new Path(FILE)); hdfsAdmin.satisfyStoragePolicy(new Path(FILE)); - Assert.fail(String.format("Should failed to satisfy storage policy " - + "for %s ,since it has been " + "added to satisfy movement queue.", - FILE)); - } catch (IOException e) { - GenericTestUtils.assertExceptionContains( - String.format("Cannot request to call satisfy storage policy " - + "on path %s, as this file/dir was already called for " - + "satisfying storage policy.", FILE), - e); + } catch (Exception e) { + Assert.fail(String.format("Allow to invoke mutlipe times " + + "#satisfyStoragePolicy() api for a path %s , internally just " + + "skipping addtion to satisfy movement queue.", FILE)); } } finally { shutdownCluster(); @@ -563,7 +558,7 @@ public void testWhenMoverIsAlreadyRunningBeforeStoragePolicySatisfier() DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, StoragePolicySatisfierMode.NONE.toString()); running = hdfsCluster.getFileSystem() - .getClient().isStoragePolicySatisfierRunning(); + .getClient().isInternalSatisfierRunning(); Assert.assertFalse("SPS should stopped as configured.", running); // Simulate the case by creating MOVER_ID file @@ -576,7 +571,7 @@ public void testWhenMoverIsAlreadyRunningBeforeStoragePolicySatisfier() StoragePolicySatisfierMode.INTERNAL.toString()); running = hdfsCluster.getFileSystem() - .getClient().isStoragePolicySatisfierRunning(); + .getClient().isInternalSatisfierRunning(); Assert.assertFalse("SPS should not be able to run as file " + HdfsServerConstants.MOVER_ID_PATH + " is being hold.", running); @@ -591,7 +586,7 @@ public void testWhenMoverIsAlreadyRunningBeforeStoragePolicySatisfier() DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, StoragePolicySatisfierMode.INTERNAL.toString()); running = hdfsCluster.getFileSystem() - .getClient().isStoragePolicySatisfierRunning(); + .getClient().isInternalSatisfierRunning(); Assert.assertTrue("SPS should be running as " + "Mover already exited", running); @@ -623,7 +618,7 @@ public void testWhenMoverExitsWithoutDeleteMoverIDFile() HdfsServerConstants.MOVER_ID_PATH, 0, (short) 1, 0); restartNamenode(); boolean running = hdfsCluster.getFileSystem() - .getClient().isStoragePolicySatisfierRunning(); + .getClient().isInternalSatisfierRunning(); Assert.assertTrue("SPS should be running as " + "no Mover really running", running); } finally { @@ -1293,8 +1288,8 @@ public boolean isRunning() { sps.getStorageMovementQueue().activate(); INode rootINode = fsDir.getINode("/root"); - hdfsCluster.getNamesystem().getBlockManager() - .addSPSPathId(rootINode.getId()); + hdfsCluster.getNamesystem().getBlockManager().getSPSManager() + .addPathId(rootINode.getId()); //Wait for thread to reach U. Thread.sleep(1000); @@ -1360,8 +1355,8 @@ public boolean isRunning() { sps.getStorageMovementQueue().activate(); INode rootINode = fsDir.getINode("/root"); - hdfsCluster.getNamesystem().getBlockManager() - .addSPSPathId(rootINode.getId()); + hdfsCluster.getNamesystem().getBlockManager().getSPSManager() + .addPathId(rootINode.getId()); // Wait for thread to reach U. Thread.sleep(1000); @@ -1704,7 +1699,8 @@ private String createFileAndSimulateFavoredNodes(int favoredNodesCount) private void waitForAttemptedItems(long expectedBlkMovAttemptedCount, int timeout) throws TimeoutException, InterruptedException { BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier(); + final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager + .getSPSManager().getInternalSPSService(); GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { @@ -1723,7 +1719,8 @@ private void waitForBlocksMovementAttemptReport( long expectedMovementFinishedBlocksCount, int timeout) throws TimeoutException, InterruptedException { BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier(); + final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager + .getSPSManager().getInternalSPSService(); GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java index 2257608aa7de6..ef123001e1688 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java @@ -495,7 +495,8 @@ private void waitForAttemptedItems(MiniDFSCluster cluster, long expectedBlkMovAttemptedCount, int timeout) throws TimeoutException, InterruptedException { BlockManager blockManager = cluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier(); + final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager + .getSPSManager().getInternalSPSService(); GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { @@ -566,7 +567,8 @@ private void waitForBlocksMovementAttemptReport(MiniDFSCluster cluster, long expectedMoveFinishedBlks, int timeout) throws TimeoutException, InterruptedException { BlockManager blockManager = cluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier(); + final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager + .getSPSManager().getInternalSPSService(); Assert.assertNotNull("Failed to get SPS object reference!", sps); GenericTestUtils.waitFor(new Supplier() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java index 15a427113ac23..0546f39bcee5a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java @@ -133,7 +133,7 @@ public MiniDFSCluster startCluster(final Configuration conf, BlockManager blkMgr = cluster.getNameNode().getNamesystem() .getBlockManager(); - SPSService spsService = blkMgr.getSPSService(); + SPSService spsService = blkMgr.getSPSManager().getInternalSPSService(); spsService.stopGracefully(); ExternalSPSContext context = new ExternalSPSContext(spsService, @@ -143,12 +143,12 @@ public MiniDFSCluster startCluster(final Configuration conf, new ExternalBlockMovementListener(); ExternalSPSBlockMoveTaskHandler externalHandler = new ExternalSPSBlockMoveTaskHandler(conf, nnc, - blkMgr.getSPSService()); + blkMgr.getSPSManager().getInternalSPSService()); externalHandler.init(); spsService.init(context, - new ExternalSPSFileIDCollector(context, blkMgr.getSPSService()), - externalHandler, - blkMoveListener); + new ExternalSPSFileIDCollector(context, + blkMgr.getSPSManager().getInternalSPSService()), + externalHandler, blkMoveListener); spsService.start(true, StoragePolicySatisfierMode.EXTERNAL); return cluster; } @@ -156,14 +156,14 @@ public MiniDFSCluster startCluster(final Configuration conf, public void restartNamenode() throws IOException{ BlockManager blkMgr = getCluster().getNameNode().getNamesystem() .getBlockManager(); - SPSService spsService = blkMgr.getSPSService(); + SPSService spsService = blkMgr.getSPSManager().getInternalSPSService(); spsService.stopGracefully(); getCluster().restartNameNodes(); getCluster().waitActive(); blkMgr = getCluster().getNameNode().getNamesystem() .getBlockManager(); - spsService = blkMgr.getSPSService(); + spsService = blkMgr.getSPSManager().getInternalSPSService(); spsService.stopGracefully(); ExternalSPSContext context = new ExternalSPSContext(spsService, @@ -172,12 +172,12 @@ public void restartNamenode() throws IOException{ new ExternalBlockMovementListener(); ExternalSPSBlockMoveTaskHandler externalHandler = new ExternalSPSBlockMoveTaskHandler(getConf(), nnc, - blkMgr.getSPSService()); + blkMgr.getSPSManager().getInternalSPSService()); externalHandler.init(); spsService.init(context, - new ExternalSPSFileIDCollector(context, blkMgr.getSPSService()), - externalHandler, - blkMoveListener); + new ExternalSPSFileIDCollector(context, + blkMgr.getSPSManager().getInternalSPSService()), + externalHandler, blkMoveListener); spsService.start(true, StoragePolicySatisfierMode.EXTERNAL); } @@ -323,7 +323,7 @@ public void testOutstandingQueueLimitExceeds() throws Exception { DistributedFileSystem fs = getFS(); BlockManager blkMgr = getCluster().getNameNode().getNamesystem() .getBlockManager(); - SPSService spsService = blkMgr.getSPSService(); + SPSService spsService = blkMgr.getSPSManager().getInternalSPSService(); spsService.stopGracefully(); // stops SPS // Creates 4 more files. Send all of them for satisfying the storage