Skip to content

Commit

Permalink
HDFS-10800: [SPS]: Daemon thread in Namenode to find blocks placed in…
Browse files Browse the repository at this point in the history
… other storage than what the policy specifies. Contributed by Uma Maheswara Rao G
  • Loading branch information
Uma Maheswara Rao G authored and umamaheswararao committed Aug 12, 2018
1 parent 5692887 commit 1438da4
Show file tree
Hide file tree
Showing 10 changed files with 791 additions and 32 deletions.
Expand Up @@ -53,6 +53,7 @@
import java.util.Comparator; import java.util.Comparator;
import java.util.Date; import java.util.Date;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
Expand All @@ -73,6 +74,7 @@
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
Expand Down Expand Up @@ -1716,4 +1718,43 @@ public static DelegationTokenIdentifier decodeDelegationToken(
} }
return id; return id;
} }

/**
* Remove the overlap between the expected types and the existing types.
*
* @param expected
* - Expected storage types list.
* @param existing
* - Existing storage types list.
* @param ignoreNonMovable
* ignore non-movable storage types by removing them from both
* expected and existing storage type list to prevent non-movable
* storage from being moved.
* @returns if the existing types or the expected types is empty after
* removing the overlap.
*/
public static boolean removeOverlapBetweenStorageTypes(
List<StorageType> expected,
List<StorageType> existing, boolean ignoreNonMovable) {
for (Iterator<StorageType> i = existing.iterator(); i.hasNext();) {
final StorageType t = i.next();
if (expected.remove(t)) {
i.remove();
}
}
if (ignoreNonMovable) {
removeNonMovable(existing);
removeNonMovable(expected);
}
return expected.isEmpty() || existing.isEmpty();
}

private static void removeNonMovable(List<StorageType> types) {
for (Iterator<StorageType> i = types.iterator(); i.hasNext();) {
final StorageType t = i.next();
if (!t.isMovable()) {
i.remove();
}
}
}
} }
Expand Up @@ -89,6 +89,8 @@
import org.apache.hadoop.hdfs.server.namenode.INodesInPath; import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementNeeded;
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
Expand Down Expand Up @@ -425,6 +427,11 @@ public long getTotalECBlockGroups() {


private final BlockIdManager blockIdManager; private final BlockIdManager blockIdManager;


/** For satisfying block storage policies */
private final StoragePolicySatisfier sps;
private final BlockStorageMovementNeeded storageMovementNeeded =
new BlockStorageMovementNeeded();

/** Minimum live replicas needed for the datanode to be transitioned /** Minimum live replicas needed for the datanode to be transitioned
* from ENTERING_MAINTENANCE to IN_MAINTENANCE. * from ENTERING_MAINTENANCE to IN_MAINTENANCE.
*/ */
Expand Down Expand Up @@ -464,6 +471,7 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT) DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT)
* 1000L); * 1000L);


sps = new StoragePolicySatisfier(namesystem, storageMovementNeeded, this);
blockTokenSecretManager = createBlockTokenSecretManager(conf); blockTokenSecretManager = createBlockTokenSecretManager(conf);


providedStorageMap = new ProvidedStorageMap(namesystem, this, conf); providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);
Expand Down Expand Up @@ -688,9 +696,11 @@ public void activate(Configuration conf, long blockTotal) {
this.blockReportThread.start(); this.blockReportThread.start();
mxBeanName = MBeans.register("NameNode", "BlockStats", this); mxBeanName = MBeans.register("NameNode", "BlockStats", this);
bmSafeMode.activate(blockTotal); bmSafeMode.activate(blockTotal);
sps.start();
} }


public void close() { public void close() {
sps.stop();
bmSafeMode.close(); bmSafeMode.close();
try { try {
redundancyThread.interrupt(); redundancyThread.interrupt();
Expand Down Expand Up @@ -4980,4 +4990,14 @@ public void setBlockRecoveryTimeout(long blockRecoveryTimeout) {
public ProvidedStorageMap getProvidedStorageMap() { public ProvidedStorageMap getProvidedStorageMap() {
return providedStorageMap; return providedStorageMap;
} }

/**
* Set file block collection for which storage movement needed for its blocks.
*
* @param id
* - file block collection id.
*/
public void satisfyStoragePolicy(long id) {
storageMovementNeeded.add(id);
}
} }
Expand Up @@ -43,6 +43,8 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
Expand Down Expand Up @@ -206,6 +208,10 @@ public Type getType() {
private final LightWeightHashSet<Block> invalidateBlocks = private final LightWeightHashSet<Block> invalidateBlocks =
new LightWeightHashSet<>(); new LightWeightHashSet<>();


/** A queue of blocks for moving its storage placements by this datanode. */
private final Queue<List<BlockMovingInfo>> storageMovementBlocks =
new LinkedList<>();

/* Variables for maintaining number of blocks scheduled to be written to /* Variables for maintaining number of blocks scheduled to be written to
* this storage. This count is approximate and might be slightly bigger * this storage. This count is approximate and might be slightly bigger
* in case of errors (e.g. datanode does not report if an error occurs * in case of errors (e.g. datanode does not report if an error occurs
Expand Down Expand Up @@ -1065,5 +1071,37 @@ public boolean hasStorageType(StorageType type) {
} }
return false; return false;
} }

/**
* Add the block infos which needs to move its storage locations.
*
* @param storageMismatchedBlocks
* - storage mismatched block infos
*/
public void addBlocksToMoveStorage(
List<BlockMovingInfo> storageMismatchedBlocks) {
storageMovementBlocks.offer(storageMismatchedBlocks);
}

/**
* @return block infos which needs to move its storage locations.
*/
public List<BlockMovingInfo> getBlocksToMoveStorages() {
return storageMovementBlocks.poll();
}

// TODO: we will remove this method once DN side handling integrated. We can
// convert the test to check real block movements instead of this ds.
@VisibleForTesting
public List<BlockMovingInfo> getStorageMovementPendingItems() {
List<BlockMovingInfo> flatList = new ArrayList<>();
Iterator<List<BlockMovingInfo>> iterator = storageMovementBlocks
.iterator();
while (iterator.hasNext()) {
List<BlockMovingInfo> next = iterator.next();
flatList.addAll(next);
}
return flatList;
}
} }


Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.*; import org.apache.hadoop.net.*;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException; import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
Expand Down
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
Expand Down Expand Up @@ -125,21 +126,19 @@ public void rejectedExecution(Runnable runnable,
return moverThreadPool; return moverThreadPool;
} }


public void processBlockMovingTasks(long trackID, public void processBlockMovingTasks(long trackID, String blockPoolID,
List<BlockMovingInfo> blockMovingInfos) { List<BlockMovingInfo> blockMovingInfos) {
Future<Void> moveCallable = null; Future<Void> moveCallable = null;
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
assert blkMovingInfo assert blkMovingInfo
.getSources().length == blkMovingInfo.getTargets().length; .getSources().length == blkMovingInfo.getTargets().length;


for (int i = 0; i < blkMovingInfo.getSources().length; i++) { for (int i = 0; i < blkMovingInfo.getSources().length; i++) {
BlockMovingTask blockMovingTask = BlockMovingTask blockMovingTask = new BlockMovingTask(
new BlockMovingTask(blkMovingInfo.getBlock(), blkMovingInfo.getBlock(), blockPoolID,
blkMovingInfo.getSources()[i], blkMovingInfo.getSources()[i], blkMovingInfo.getTargets()[i],
blkMovingInfo.getTargets()[i],
blkMovingInfo.getTargetStorageTypes()[i]); blkMovingInfo.getTargetStorageTypes()[i]);
moveCallable = moverExecutorCompletionService moveCallable = moverExecutorCompletionService.submit(blockMovingTask);
.submit(blockMovingTask);
moverTaskFutures.add(moveCallable); moverTaskFutures.add(moveCallable);
} }
} }
Expand All @@ -163,14 +162,16 @@ public void processBlockMovingTasks(long trackID,
* given target. * given target.
*/ */
private class BlockMovingTask implements Callable<Void> { private class BlockMovingTask implements Callable<Void> {
private final ExtendedBlock block; private final Block block;
private final DatanodeInfo source; private final DatanodeInfo source;
private final DatanodeInfo target; private final DatanodeInfo target;
private final StorageType targetStorageType; private final StorageType targetStorageType;
private String blockPoolID;


BlockMovingTask(ExtendedBlock block, DatanodeInfo source, BlockMovingTask(Block block, String blockPoolID, DatanodeInfo source,
DatanodeInfo target, StorageType targetStorageType) { DatanodeInfo target, StorageType targetStorageType) {
this.block = block; this.block = block;
this.blockPoolID = blockPoolID;
this.source = source; this.source = source;
this.target = target; this.target = target;
this.targetStorageType = targetStorageType; this.targetStorageType = targetStorageType;
Expand Down Expand Up @@ -201,12 +202,12 @@ private void moveBlock() {


OutputStream unbufOut = sock.getOutputStream(); OutputStream unbufOut = sock.getOutputStream();
InputStream unbufIn = sock.getInputStream(); InputStream unbufIn = sock.getInputStream();

ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block);
Token<BlockTokenIdentifier> accessToken = datanode.getBlockAccessToken( Token<BlockTokenIdentifier> accessToken = datanode.getBlockAccessToken(
block, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE)); extendedBlock, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));


DataEncryptionKeyFactory keyFactory = datanode DataEncryptionKeyFactory keyFactory = datanode
.getDataEncryptionKeyFactoryForBlock(block); .getDataEncryptionKeyFactoryForBlock(extendedBlock);
IOStreamPair saslStreams = datanode.getSaslClient().socketSend(sock, IOStreamPair saslStreams = datanode.getSaslClient().socketSend(sock,
unbufOut, unbufIn, keyFactory, accessToken, target); unbufOut, unbufIn, keyFactory, accessToken, target);
unbufOut = saslStreams.out; unbufOut = saslStreams.out;
Expand All @@ -215,10 +216,10 @@ private void moveBlock() {
new BufferedOutputStream(unbufOut, ioFileBufferSize)); new BufferedOutputStream(unbufOut, ioFileBufferSize));
in = new DataInputStream( in = new DataInputStream(
new BufferedInputStream(unbufIn, ioFileBufferSize)); new BufferedInputStream(unbufIn, ioFileBufferSize));
sendRequest(out, block, accessToken, source, targetStorageType); sendRequest(out, extendedBlock, accessToken, source, targetStorageType);
receiveResponse(in); receiveResponse(in);


LOG.debug( LOG.info(
"Successfully moved block:{} from src:{} to destin:{} for" "Successfully moved block:{} from src:{} to destin:{} for"
+ " satisfying storageType:{}", + " satisfying storageType:{}",
block, source, target, targetStorageType); block, source, target, targetStorageType);
Expand Down
@@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;

import java.util.LinkedList;
import java.util.Queue;

import org.apache.hadoop.classification.InterfaceAudience;

/**
* A Class to track the block collection IDs for which physical storage movement
* needed as per the Namespace and StorageReports from DN.
*/
@InterfaceAudience.Private
public class BlockStorageMovementNeeded {
private final Queue<Long> storageMovementNeeded = new LinkedList<Long>();

/**
* Add the block collection id to tracking list for which storage movement
* expected if necessary.
*
* @param blockCollectionID
* - block collection id, which is nothing but inode id.
*/
public synchronized void add(Long blockCollectionID) {
storageMovementNeeded.add(blockCollectionID);
}

/**
* Gets the block collection id for which storage movements check necessary
* and make the movement if required.
*
* @return block collection ID
*/
public synchronized Long get() {
return storageMovementNeeded.poll();
}
}

0 comments on commit 1438da4

Please sign in to comment.