Skip to content

Commit

Permalink
HDFS-7647. DatanodeManager.sortLocatedBlocks sorts DatanodeInfos but …
Browse files Browse the repository at this point in the history
…not StorageIDs. (Contributed by Milan Desai)
  • Loading branch information
arp7 committed Feb 9, 2015
1 parent 241336c commit ab934e8
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 21 deletions.
Expand Up @@ -859,7 +859,7 @@ protected int getWeight(Node reader, Node node) {
// Start off by initializing to off rack // Start off by initializing to off rack
int weight = 2; int weight = 2;
if (reader != null) { if (reader != null) {
if (reader == node) { if (reader.equals(node)) {
weight = 0; weight = 0;
} else if (isOnSameRack(reader, node)) { } else if (isOnSameRack(reader, node)) {
weight = 1; weight = 1;
Expand Down
Expand Up @@ -254,7 +254,7 @@ protected int getWeight(Node reader, Node node) {
// Start off by initializing to off rack // Start off by initializing to off rack
int weight = 3; int weight = 3;
if (reader != null) { if (reader != null) {
if (reader == node) { if (reader.equals(node)) {
weight = 0; weight = 0;
} else if (isOnSameNodeGroup(reader, node)) { } else if (isOnSameNodeGroup(reader, node)) {
weight = 1; weight = 1;
Expand Down
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -872,6 +872,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7741. Remove unnecessary synchronized in FSDataInputStream and HDFS-7741. Remove unnecessary synchronized in FSDataInputStream and
HdfsDataInputStream. (yliu) HdfsDataInputStream. (yliu)


HDFS-7647. DatanodeManager.sortLocatedBlocks sorts DatanodeInfos
but not StorageIDs. (Milan Desai via Arpit Agarwal)

Release 2.6.1 - UNRELEASED Release 2.6.1 - UNRELEASED


INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;


import com.google.common.collect.Lists; import com.google.common.collect.Lists;
Expand All @@ -41,11 +42,13 @@ public class LocatedBlock {


private final ExtendedBlock b; private final ExtendedBlock b;
private long offset; // offset of the first byte of the block in the file private long offset; // offset of the first byte of the block in the file
private final DatanodeInfo[] locs; private final DatanodeInfoWithStorage[] locs;
/** Storage ID for each replica */ private final boolean hasStorageIDs;
private final String[] storageIDs; private final boolean hasStorageTypes;
// Storage type for each replica, if reported. /** Cached storage ID for each replica */
private final StorageType[] storageTypes; private String[] storageIDs;
/** Cached storage type for each replica, if reported. */
private StorageType[] storageTypes;
// corrupt flag is true if all of the replicas of a block are corrupt. // corrupt flag is true if all of the replicas of a block are corrupt.
// else false. If block has few corrupt replicas, they are filtered and // else false. If block has few corrupt replicas, they are filtered and
// their locations are not part of this object // their locations are not part of this object
Expand All @@ -57,7 +60,8 @@ public class LocatedBlock {
private DatanodeInfo[] cachedLocs; private DatanodeInfo[] cachedLocs;


// Used when there are no locations // Used when there are no locations
private static final DatanodeInfo[] EMPTY_LOCS = new DatanodeInfo[0]; private static final DatanodeInfoWithStorage[] EMPTY_LOCS =
new DatanodeInfoWithStorage[0];


public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) { public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) {
this(b, locs, -1, false); // startOffset is unknown this(b, locs, -1, false); // startOffset is unknown
Expand Down Expand Up @@ -94,10 +98,22 @@ public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, String[] storageIDs,
if (locs==null) { if (locs==null) {
this.locs = EMPTY_LOCS; this.locs = EMPTY_LOCS;
} else { } else {
this.locs = locs; this.locs = new DatanodeInfoWithStorage[locs.length];
for(int i = 0; i < locs.length; i++) {
DatanodeInfo di = locs[i];
DatanodeInfoWithStorage storage = new DatanodeInfoWithStorage(di,
storageIDs != null ? storageIDs[i] : null,
storageTypes != null ? storageTypes[i] : null);
storage.setDependentHostNames(di.getDependentHostNames());
storage.setLevel(di.getLevel());
storage.setParent(di.getParent());
this.locs[i] = storage;
}
} }
this.storageIDs = storageIDs; this.storageIDs = storageIDs;
this.storageTypes = storageTypes; this.storageTypes = storageTypes;
this.hasStorageIDs = storageIDs != null;
this.hasStorageTypes = storageTypes != null;


if (cachedLocs == null || cachedLocs.length == 0) { if (cachedLocs == null || cachedLocs.length == 0) {
this.cachedLocs = EMPTY_LOCS; this.cachedLocs = EMPTY_LOCS;
Expand All @@ -118,18 +134,53 @@ public ExtendedBlock getBlock() {
return b; return b;
} }


public DatanodeInfo[] getLocations() { /**
* Returns the locations associated with this block. The returned array is not
* expected to be modified. If it is, caller must immediately invoke
* {@link org.apache.hadoop.hdfs.protocol.LocatedBlock#invalidateCachedStorageInfo}
* to invalidate the cached Storage ID/Type arrays.
*/
public DatanodeInfoWithStorage[] getLocations() {
return locs; return locs;
} }


public StorageType[] getStorageTypes() { public StorageType[] getStorageTypes() {
if(!hasStorageTypes) {
return null;
}
if(storageTypes != null) {
return storageTypes;
}
storageTypes = new StorageType[locs.length];
for(int i = 0; i < locs.length; i++) {
storageTypes[i] = locs[i].getStorageType();
}
return storageTypes; return storageTypes;
} }


public String[] getStorageIDs() { public String[] getStorageIDs() {
if(!hasStorageIDs) {
return null;
}
if(storageIDs != null) {
return storageIDs;
}
storageIDs = new String[locs.length];
for(int i = 0; i < locs.length; i++) {
storageIDs[i] = locs[i].getStorageID();
}
return storageIDs; return storageIDs;
} }


/**
* Invalidates the cached StorageID and StorageType information. Must be
* called when the locations array is modified.
*/
public void invalidateCachedStorageInfo() {
storageIDs = null;
storageTypes = null;
}

public long getStartOffset() { public long getStartOffset() {
return offset; return offset;
} }
Expand Down Expand Up @@ -161,9 +212,9 @@ public void addCachedLoc(DatanodeInfo loc) {
return; return;
} }
// Try to re-use a DatanodeInfo already in loc // Try to re-use a DatanodeInfo already in loc
for (int i=0; i<locs.length; i++) { for (DatanodeInfoWithStorage di : locs) {
if (locs[i].equals(loc)) { if (loc.equals(di)) {
cachedList.add(locs[i]); cachedList.add(di);
cachedLocs = cachedList.toArray(cachedLocs); cachedLocs = cachedList.toArray(cachedLocs);
return; return;
} }
Expand All @@ -187,10 +238,6 @@ public String toString() {
+ "; corrupt=" + corrupt + "; corrupt=" + corrupt
+ "; offset=" + offset + "; offset=" + offset
+ "; locs=" + Arrays.asList(locs) + "; locs=" + Arrays.asList(locs)
+ "; storageIDs=" +
(storageIDs != null ? Arrays.asList(storageIDs) : null)
+ "; storageTypes=" +
(storageTypes != null ? Arrays.asList(storageTypes) : null)
+ "}"; + "}";
} }
} }
Expand Down
Expand Up @@ -391,6 +391,8 @@ public void sortLocatedBlocks(final String targethost,
} }
int activeLen = lastActiveIndex + 1; int activeLen = lastActiveIndex + 1;
networktopology.sortByDistance(client, b.getLocations(), activeLen); networktopology.sortByDistance(client, b.getLocations(), activeLen);
// must invalidate cache since we modified locations array
b.invalidateCachedStorageInfo();
} }
} }


Expand Down
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;

import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;

public class DatanodeInfoWithStorage extends DatanodeInfo {
private final String storageID;
private final StorageType storageType;

public DatanodeInfoWithStorage(DatanodeInfo from, String storageID,
StorageType storageType) {
super(from);
this.storageID = storageID;
this.storageType = storageType;
}

public String getStorageID() {
return storageID;
}

public StorageType getStorageType() {
return storageType;
}

@Override
public boolean equals(Object o) {
// allows this class to be used interchangeably with DatanodeInfo
return super.equals(o);
}

@Override
public int hashCode() {
// allows this class to be used interchangeably with DatanodeInfo
return super.hashCode();
}

@Override
public String toString() {
return "DatanodeInfoWithStorage[" + super.toString() + "," + storageID +
"," + storageType + "]";
}
}
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
Expand Down Expand Up @@ -801,22 +802,23 @@ public void testDecommissionWithOpenfile() throws IOException, InterruptedExcept


ArrayList<String> nodes = new ArrayList<String>(); ArrayList<String> nodes = new ArrayList<String>();
ArrayList<DatanodeInfo> dnInfos = new ArrayList<DatanodeInfo>(); ArrayList<DatanodeInfo> dnInfos = new ArrayList<DatanodeInfo>();


DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
for (DatanodeInfo datanodeInfo : dnInfos4FirstBlock) { for (DatanodeInfo datanodeInfo : dnInfos4FirstBlock) {
DatanodeInfo found = datanodeInfo; DatanodeInfo found = datanodeInfo;
for (DatanodeInfo dif: dnInfos4LastBlock) { for (DatanodeInfo dif: dnInfos4LastBlock) {
if (datanodeInfo.equals(dif)) { if (datanodeInfo.equals(dif)) {
found = null; found = null;
} }
} }
if (found != null) { if (found != null) {
nodes.add(found.getXferAddr()); nodes.add(found.getXferAddr());
dnInfos.add(found); dnInfos.add(dm.getDatanode(found));
} }
} }
//decommission one of the 3 nodes which have last block //decommission one of the 3 nodes which have last block
nodes.add(dnInfos4LastBlock[0].getXferAddr()); nodes.add(dnInfos4LastBlock[0].getXferAddr());
dnInfos.add(dnInfos4LastBlock[0]); dnInfos.add(dm.getDatanode(dnInfos4LastBlock[0]));


writeConfigFile(excludeFile, nodes); writeConfigFile(excludeFile, nodes);
refreshNodes(ns, conf); refreshNodes(ns, conf);
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;


import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
Expand All @@ -31,13 +32,19 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;


import static org.hamcrest.core.Is.is;
import static org.junit.Assert.*; import static org.junit.Assert.*;


public class TestDatanodeManager { public class TestDatanodeManager {
Expand Down Expand Up @@ -210,4 +217,81 @@ public void reloadCachedMappings() {
public void reloadCachedMappings(List<String> names) { public void reloadCachedMappings(List<String> names) {
} }
} }

/**
* This test creates a LocatedBlock with 5 locations, sorts the locations
* based on the network topology, and ensures the locations are still aligned
* with the storage ids and storage types.
*/
@Test
public void testSortLocatedBlocks() throws IOException {
// create the DatanodeManager which will be tested
FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
Mockito.when(fsn.hasWriteLock()).thenReturn(true);
DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class),
fsn, new Configuration());

// register 5 datanodes, each with different storage ID and type
DatanodeInfo[] locs = new DatanodeInfo[5];
String[] storageIDs = new String[5];
StorageType[] storageTypes = new StorageType[]{
StorageType.ARCHIVE,
StorageType.DEFAULT,
StorageType.DISK,
StorageType.RAM_DISK,
StorageType.SSD
};
for(int i = 0; i < 5; i++) {
// register new datanode
String uuid = "UUID-"+i;
String ip = "IP-" + i;
DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid);
Mockito.when(dr.getIpAddr()).thenReturn(ip);
Mockito.when(dr.getXferAddr()).thenReturn(ip + ":9000");
Mockito.when(dr.getXferPort()).thenReturn(9000);
Mockito.when(dr.getSoftwareVersion()).thenReturn("version1");
dm.registerDatanode(dr);

// get location and storage information
locs[i] = dm.getDatanode(uuid);
storageIDs[i] = "storageID-"+i;
}

// set first 2 locations as decomissioned
locs[0].setDecommissioned();
locs[1].setDecommissioned();

// create LocatedBlock with above locations
ExtendedBlock b = new ExtendedBlock("somePoolID", 1234);
LocatedBlock block = new LocatedBlock(b, locs, storageIDs, storageTypes);
List<LocatedBlock> blocks = new ArrayList<>();
blocks.add(block);

final String targetIp = locs[4].getIpAddr();

// sort block locations
dm.sortLocatedBlocks(targetIp, blocks);

// check that storage IDs/types are aligned with datanode locs
DatanodeInfoWithStorage[] sortedLocs = block.getLocations();
storageIDs = block.getStorageIDs();
storageTypes = block.getStorageTypes();
assertThat(sortedLocs.length, is(5));
assertThat(storageIDs.length, is(5));
assertThat(storageTypes.length, is(5));
for(int i = 0; i < sortedLocs.length; i++) {
assertThat(sortedLocs[i].getStorageID(), is(storageIDs[i]));
assertThat(sortedLocs[i].getStorageType(), is(storageTypes[i]));
}

// Ensure the local node is first.
assertThat(sortedLocs[0].getIpAddr(), is(targetIp));

// Ensure the two decommissioned DNs were moved to the end.
assertThat(sortedLocs[sortedLocs.length-1].getAdminState(),
is(DatanodeInfo.AdminStates.DECOMMISSIONED));
assertThat(sortedLocs[sortedLocs.length-2].getAdminState(),
is(DatanodeInfo.AdminStates.DECOMMISSIONED));
}
} }

0 comments on commit ab934e8

Please sign in to comment.