Skip to content
This repository has been archived by the owner on Feb 9, 2021. It is now read-only.

Commit

Permalink
svn merge -c 1354773 from branch-1 for HDFS-3551.
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo committed Jun 27, 2012
1 parent 9d7b244 commit f832988
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 10 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Expand Up @@ -259,6 +259,9 @@ Release 1.1.0 - unreleased
HDFS-3522. If a namenode is in safemode, it should throw SafeModeException
when getBlockLocations has zero locations. (Brandon Li via szetszwo)

HDFS-3551. WebHDFS CREATE should use client location for HTTP redirection.
(szetszwo)

Release 1.0.3 - 2012.05.07

NEW FEATURES
Expand Down
19 changes: 19 additions & 0 deletions src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
Expand Up @@ -3100,6 +3100,20 @@ boolean computeReplicationWorkForBlock(Block block, int priority) {
return true;
}

/** Choose a datanode near to the given address. */
public DatanodeInfo chooseDatanode(String address, long blocksize) {
final DatanodeDescriptor clientNode = host2DataNodeMap.getDatanodeByHost(
address);
if (clientNode != null) {
final DatanodeDescriptor[] datanodes = replicator.chooseTarget(
1, clientNode, null, blocksize);
if (datanodes.length > 0) {
return datanodes[0];
}
}
return null;
}

/**
* Parse the data-nodes the block belongs to and choose one,
* which will be the replication source.
Expand Down Expand Up @@ -5929,4 +5943,9 @@ void removeDecomNodeFromDeadList(ArrayList<DatanodeDescriptor> dead) {
}
}
}

@Override
public String toString() {
return getClass().getSimpleName() + ": " + host2DataNodeMap;
}
}
10 changes: 10 additions & 0 deletions src/hdfs/org/apache/hadoop/hdfs/server/namenode/Host2NodesMap.java
Expand Up @@ -185,4 +185,14 @@ public DatanodeDescriptor getDatanodeByName(String name) {
hostmapLock.readLock().unlock();
}
}

@Override
public String toString() {
final StringBuilder b = new StringBuilder(getClass().getSimpleName())
.append("[");
for(Map.Entry<String, DatanodeDescriptor[]> e : map.entrySet()) {
b.append("\n " + e.getKey() + " => " + Arrays.asList(e.getValue()));
}
return b.append("\n]").toString();
}
}
Expand Up @@ -54,6 +54,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.JspHelper;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.web.JsonUtil;
Expand Down Expand Up @@ -107,6 +108,11 @@ public static String getRemoteAddress() {
return REMOTE_ADDRESS.get();
}

/** Set the remote client address. */
static void setRemoteAddress(String remoteAddress) {
REMOTE_ADDRESS.set(remoteAddress);
}

private @Context ServletContext context;
private @Context HttpServletRequest request;
private @Context HttpServletResponse response;
Expand All @@ -126,12 +132,21 @@ private void init(final UserGroupInformation ugi,
response.setContentType(null);
}

private static DatanodeInfo chooseDatanode(final NameNode namenode,
final String path, final HttpOpParam.Op op, final long openOffset
) throws IOException {
if (op == GetOpParam.Op.OPEN
static DatanodeInfo chooseDatanode(final NameNode namenode,
final String path, final HttpOpParam.Op op, final long openOffset,
final long blocksize) throws IOException {
final FSNamesystem ns = namenode.getNamesystem();

if (op == PutOpParam.Op.CREATE) {
//choose a datanode near to client
final DatanodeInfo dn = ns.chooseDatanode(getRemoteAddress(), blocksize);
if (dn != null) {
return dn;
}
} else if (op == GetOpParam.Op.OPEN
|| op == GetOpParam.Op.GETFILECHECKSUM
|| op == PostOpParam.Op.APPEND) {
//choose a datanode containing a replica
final HdfsFileStatus status = namenode.getFileInfo(path);
if (status == null) {
throw new FileNotFoundException("File " + path + " not found.");
Expand All @@ -155,7 +170,7 @@ private static DatanodeInfo chooseDatanode(final NameNode namenode,
}
}

return namenode.getNamesystem().getRandomDatanode();
return ns.getRandomDatanode();
}

private Token<? extends TokenIdentifier> generateDelegationToken(
Expand All @@ -173,8 +188,10 @@ private URI redirectURI(final NameNode namenode,
final UserGroupInformation ugi, final DelegationParam delegation,
final UserParam username, final DoAsParam doAsUser,
final String path, final HttpOpParam.Op op, final long openOffset,
final long blocksize,
final Param<?, ?>... parameters) throws URISyntaxException, IOException {
final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset);
final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset,
blocksize);

final String delegationQuery;
if (!UserGroupInformation.isSecurityEnabled()) {
Expand Down Expand Up @@ -302,7 +319,7 @@ public Response run() throws IOException, URISyntaxException {
case CREATE:
{
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), -1L,
fullpath, op.getValue(), -1L, blockSize.getValue(conf),
permission, overwrite, bufferSize, replication, blockSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
Expand Down Expand Up @@ -425,7 +442,7 @@ public Response run() throws IOException, URISyntaxException {
case APPEND:
{
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), -1L, bufferSize);
fullpath, op.getValue(), -1L, -1L, bufferSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
default:
Expand Down Expand Up @@ -507,7 +524,7 @@ public Response run() throws IOException, URISyntaxException {
case OPEN:
{
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), offset.getValue(), offset, length, bufferSize);
fullpath, op.getValue(), offset.getValue(), -1L, offset, length, bufferSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case GET_BLOCK_LOCATIONS:
Expand Down Expand Up @@ -543,7 +560,7 @@ public Response run() throws IOException, URISyntaxException {
case GETFILECHECKSUM:
{
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), -1L);
fullpath, op.getValue(), -1L, -1L);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case GETDELEGATIONTOKEN:
Expand Down
Expand Up @@ -17,9 +17,20 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;

import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;

public abstract class NameNodeAdapter {
/**
* Get block locations within the specified range.
*/
public static LocatedBlocks getBlockLocations(NameNode namenode,
String src, long offset, long length) throws IOException {
return namenode.getNamesystem().getBlockLocations(
src, offset, length, false, true, true);
}

public static boolean checkFileProgress(FSNamesystem fsn, String path, boolean checkall) throws IOException {
INodeFile f = fsn.dir.getFileINode(path);
return fsn.checkFileProgress(f, checkall);
Expand Down
@@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.web.resources;

import java.util.Arrays;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.hdfs.web.resources.PostOpParam;
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;

/**
* Test WebHDFS which provides data locality using HTTP redirection.
*/
public class TestWebHdfsDataLocality {
static final Log LOG = LogFactory.getLog(TestWebHdfsDataLocality.class);
{
((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.OFF);
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.OFF);
((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.OFF);
}

private static final String RACK0 = "/rack0";
private static final String RACK1 = "/rack1";
private static final String RACK2 = "/rack2";

@Test
public void testDataLocality() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
final String[] racks = {RACK0, RACK0, RACK1, RACK1, RACK2, RACK2};
final int nDataNodes = racks.length;
LOG.info("nDataNodes=" + nDataNodes + ", racks=" + Arrays.asList(racks));

final MiniDFSCluster cluster = new MiniDFSCluster(
conf, nDataNodes, true, racks);
try {
cluster.waitActive();

final DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
final NameNode namenode = cluster.getNameNode();
final FSNamesystem ns = namenode.getNamesystem();
LOG.info("ns=" + ns);

final long blocksize = DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
final String f = "/foo";

{ //test CREATE
for(int i = 0; i < nDataNodes; i++) {
//set client address to a particular datanode
final DataNode dn = cluster.getDataNodes().get(i);
final String host = ns.getDatanode(dn.dnRegistration).getHost();
NamenodeWebHdfsMethods.setRemoteAddress(host);

//The chosen datanode must be the same as the client address
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, PutOpParam.Op.CREATE, -1L, blocksize);
Assert.assertEquals(host, chosen.getHost());
}
}

//create a file with one replica.
final Path p = new Path(f);
final FSDataOutputStream out = dfs.create(p, (short)1);
out.write(1);
out.close();

//get replica location.
final LocatedBlocks locatedblocks = NameNodeAdapter.getBlockLocations(
namenode, f, 0, 1);
final List<LocatedBlock> lb = locatedblocks.getLocatedBlocks();
Assert.assertEquals(1, lb.size());
final DatanodeInfo[] locations = lb.get(0).getLocations();
Assert.assertEquals(1, locations.length);
final DatanodeInfo expected = locations[0];

//For GETFILECHECKSUM, OPEN and APPEND,
//the chosen datanode must be the same as the replica location.

{ //test GETFILECHECKSUM
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize);
Assert.assertEquals(expected, chosen);
}

{ //test OPEN
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.OPEN, 0, blocksize);
Assert.assertEquals(expected, chosen);
}

{ //test APPEND
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, PostOpParam.Op.APPEND, -1L, blocksize);
Assert.assertEquals(expected, chosen);
}
} finally {
cluster.shutdown();
}
}
}
7 changes: 7 additions & 0 deletions src/test/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.security.UserGroupInformation;
Expand All @@ -39,6 +40,12 @@
public class WebHdfsTestUtil {
public static final Log LOG = LogFactory.getLog(WebHdfsTestUtil.class);

public static Configuration createConf() {
final Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
return conf;
}

public static WebHdfsFileSystem getWebHdfsFileSystem(final Configuration conf
) throws IOException, URISyntaxException {
final String uri = WebHdfsFileSystem.SCHEME + "://"
Expand Down

0 comments on commit f832988

Please sign in to comment.