From fde95d463c3123b315b3d07cb5b7b7dc19f7cb73 Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Mon, 29 Jan 2018 17:22:29 -0600 Subject: [PATCH] HDFS-12574. Add CryptoInputStream to WebHdfsFileSystem read call. Contributed by Rushabh S Shah --- .../org/apache/hadoop/hdfs/DFSClient.java | 48 +---- .../org/apache/hadoop/hdfs/HdfsKMSUtil.java | 41 ++++ .../hadoop/hdfs/web/WebHdfsFileSystem.java | 101 ++++++++-- .../hdfs/web/TestWebHdfsContentLength.java | 2 + .../web/resources/NamenodeWebHdfsMethods.java | 85 +++++--- .../hadoop/hdfs/TestEncryptionZones.java | 188 ++++++++++++++++++ .../resources/TestWebHdfsDataLocality.java | 23 ++- .../apache/hadoop/hdfs/web/TestWebHDFS.java | 1 - .../hadoop/hdfs/web/TestWebHdfsTokens.java | 4 +- 9 files changed, 403 insertions(+), 90 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 92bb99ee9d7a3..2497c40106de7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -38,7 +38,6 @@ import java.net.SocketAddress; import java.net.URI; import java.net.UnknownHostException; -import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; @@ -62,8 +61,6 @@ import org.apache.hadoop.crypto.CryptoOutputStream; import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; -import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; -import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; @@ -910,46 +907,19 @@ public BlockLocation[] getBlockLocations(String src, long start, } } - /** - * Decrypts a EDEK by consulting the KeyProvider. - */ - private KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo - feInfo) throws IOException { - try (TraceScope ignored = tracer.newScope("decryptEDEK")) { - KeyProvider provider = getKeyProvider(); - if (provider == null) { - throw new IOException("No KeyProvider is configured, cannot access" + - " an encrypted file"); - } - EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption( - feInfo.getKeyName(), feInfo.getEzKeyVersionName(), feInfo.getIV(), - feInfo.getEncryptedDataEncryptionKey()); - try { - KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension - .createKeyProviderCryptoExtension(provider); - return cryptoProvider.decryptEncryptedKey(ekv); - } catch (GeneralSecurityException e) { - throw new IOException(e); - } - } - } - /** * Wraps the stream in a CryptoInputStream if the underlying file is * encrypted. */ public HdfsDataInputStream createWrappedInputStream(DFSInputStream dfsis) throws IOException { - final FileEncryptionInfo feInfo = dfsis.getFileEncryptionInfo(); + FileEncryptionInfo feInfo = dfsis.getFileEncryptionInfo(); if (feInfo != null) { - // File is encrypted, wrap the stream in a crypto stream. - // Currently only one version, so no special logic based on the version # - HdfsKMSUtil.getCryptoProtocolVersion(feInfo); - final CryptoCodec codec = HdfsKMSUtil.getCryptoCodec(conf, feInfo); - final KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo); - final CryptoInputStream cryptoIn = - new CryptoInputStream(dfsis, codec, decrypted.getMaterial(), - feInfo.getIV()); + CryptoInputStream cryptoIn; + try (TraceScope ignored = getTracer().newScope("decryptEDEK")) { + cryptoIn = HdfsKMSUtil.createWrappedInputStream(dfsis, + getKeyProvider(), feInfo, getConfiguration()); + } return new HdfsDataInputStream(cryptoIn); } else { // No FileEncryptionInfo so no encryption. @@ -978,7 +948,11 @@ public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos, // Currently only one version, so no special logic based on the version # HdfsKMSUtil.getCryptoProtocolVersion(feInfo); final CryptoCodec codec = HdfsKMSUtil.getCryptoCodec(conf, feInfo); - KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo); + KeyVersion decrypted; + try (TraceScope ignored = tracer.newScope("decryptEDEK")) { + decrypted = HdfsKMSUtil.decryptEncryptedDataEncryptionKey(feInfo, + getKeyProvider()); + } final CryptoOutputStream cryptoOut = new CryptoOutputStream(dfsos, codec, decrypted.getMaterial(), feInfo.getIV(), startPos); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HdfsKMSUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HdfsKMSUtil.java index 71d297265510d..de27f7e0edeb4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HdfsKMSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HdfsKMSUtil.java @@ -20,15 +20,21 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX; import java.io.IOException; +import java.io.InputStream; import java.net.URI; +import java.security.GeneralSecurityException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CipherSuite; import org.apache.hadoop.crypto.CryptoCodec; +import org.apache.hadoop.crypto.CryptoInputStream; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension; import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -187,4 +193,39 @@ public static Text getKeyProviderMapKey(URI namenodeUri) { return new Text(DFS_KMS_PREFIX + namenodeUri.getScheme() +"://" + namenodeUri.getAuthority()); } + + public static CryptoInputStream createWrappedInputStream(InputStream is, + KeyProvider keyProvider, FileEncryptionInfo fileEncryptionInfo, + Configuration conf) throws IOException { + // File is encrypted, wrap the stream in a crypto stream. + // Currently only one version, so no special logic based on the version# + HdfsKMSUtil.getCryptoProtocolVersion(fileEncryptionInfo); + final CryptoCodec codec = HdfsKMSUtil.getCryptoCodec( + conf, fileEncryptionInfo); + final KeyVersion decrypted = + decryptEncryptedDataEncryptionKey(fileEncryptionInfo, keyProvider); + return new CryptoInputStream(is, codec, decrypted.getMaterial(), + fileEncryptionInfo.getIV()); + } + + /** + * Decrypts a EDEK by consulting the KeyProvider. + */ + static KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo + feInfo, KeyProvider keyProvider) throws IOException { + if (keyProvider == null) { + throw new IOException("No KeyProvider is configured, cannot access" + + " an encrypted file"); + } + EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption( + feInfo.getKeyName(), feInfo.getEzKeyVersionName(), feInfo.getIV(), + feInfo.getEncryptedDataEncryptionKey()); + try { + KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension + .createKeyProviderCryptoExtension(keyProvider); + return cryptoProvider.decryptEncryptedKey(ekv); + } catch (GeneralSecurityException e) { + throw new IOException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index 2ab7a83770176..b0064957a7f78 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -37,8 +37,11 @@ import java.net.MalformedURLException; import java.net.URI; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Base64; +import java.util.Base64.Decoder; import java.util.Collection; import java.util.EnumSet; import java.util.HashSet; @@ -66,6 +69,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsServerDefaults; @@ -92,6 +96,8 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.FileEncryptionInfoProto; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.web.resources.*; import org.apache.hadoop.hdfs.web.resources.HttpOpParam.Op; @@ -133,6 +139,8 @@ public class WebHdfsFileSystem extends FileSystem /** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */ public static final String PATH_PREFIX = "/" + WebHdfsConstants.WEBHDFS_SCHEME + "/v" + VERSION; + public static final String EZ_HEADER = "X-Hadoop-Accept-EZ"; + public static final String FEFINFO_HEADER = "X-Hadoop-feInfo"; /** * Default connection factory may be overridden in tests to use smaller @@ -613,12 +621,19 @@ abstract class AbstractRunner { private boolean checkRetry; private String redirectHost; + private boolean followRedirect = true; protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) { this.op = op; this.redirected = redirected; } + protected AbstractRunner(final HttpOpParam.Op op, boolean redirected, + boolean followRedirect) { + this(op, redirected); + this.followRedirect = followRedirect; + } + T run() throws IOException { UserGroupInformation connectUgi = ugi.getRealUser(); if (connectUgi == null) { @@ -685,9 +700,17 @@ protected HttpURLConnection connect(URL url) throws IOException { // See http://tinyurl.com/java7-http-keepalive conn.disconnect(); } + if (!followRedirect) { + return conn; + } } try { - return connect(op, url); + final HttpURLConnection conn = connect(op, url); + // output streams will validate on close + if (!op.getDoOutput()) { + validateResponse(op, conn, false); + } + return conn; } catch (IOException ioe) { if (redirectHost != null) { if (excludeDatanodes.getValue() != null) { @@ -713,6 +736,7 @@ private HttpURLConnection connect(final HttpOpParam.Op op, final URL url) // The value of the header is unimportant. Only its presence matters. conn.setRequestProperty(restCsrfCustomHeader, "\"\""); } + conn.setRequestProperty(EZ_HEADER, "true"); switch (op.getType()) { // if not sending a message body for a POST or PUT operation, need // to ensure the server/proxy knows this @@ -760,10 +784,6 @@ private T runWithRetry() throws IOException { final URL url = getUrl(); try { final HttpURLConnection conn = connect(url); - // output streams will validate on close - if (!op.getDoOutput()) { - validateResponse(op, conn, false); - } return getResponse(conn); } catch (AccessControlException ace) { // no retries for auth failures @@ -809,7 +829,6 @@ private void shouldRetry(final IOException ioe, final int retry a.action == RetryPolicy.RetryAction.RetryDecision.RETRY; boolean isFailoverAndRetry = a.action == RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY; - if (isRetry || isFailoverAndRetry) { LOG.info("Retrying connect to namenode: {}. Already retried {}" + " time(s); retry policy is {}, delay {}ms.", @@ -990,16 +1009,16 @@ HttpURLConnection getResponse(final HttpURLConnection conn) /** * Used by open() which tracks the resolved url itself */ - final class URLRunner extends AbstractRunner { + class URLRunner extends AbstractRunner { private final URL url; @Override - protected URL getUrl() { + protected URL getUrl() throws IOException { return url; } protected URLRunner(final HttpOpParam.Op op, final URL url, - boolean redirected) { - super(op, redirected); + boolean redirected, boolean followRedirect) { + super(op, redirected, followRedirect); this.url = url; } @@ -1412,12 +1431,20 @@ public boolean delete(Path f, boolean recursive) throws IOException { ).run(); } + @SuppressWarnings("resource") @Override public FSDataInputStream open(final Path f, final int bufferSize ) throws IOException { statistics.incrementReadOps(1); storageStatistics.incrementOpCounter(OpType.OPEN); - return new FSDataInputStream(new WebHdfsInputStream(f, bufferSize)); + WebHdfsInputStream webfsInputStream = + new WebHdfsInputStream(f, bufferSize); + if (webfsInputStream.getFileEncryptionInfo() == null) { + return new FSDataInputStream(webfsInputStream); + } else { + return new FSDataInputStream( + webfsInputStream.createWrappedInputStream()); + } } @Override @@ -1462,7 +1489,8 @@ protected HttpURLConnection connect(final long offset, final boolean resolved) throws IOException { final URL offsetUrl = offset == 0L? url : new URL(url + "&" + new OffsetParam(offset)); - return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run(); + return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved, + true).run(); } } @@ -1928,6 +1956,15 @@ ReadRunner getReadRunner() { void setReadRunner(ReadRunner rr) { this.readRunner = rr; } + + FileEncryptionInfo getFileEncryptionInfo() { + return readRunner.getFileEncryptionInfo(); + } + + InputStream createWrappedInputStream() throws IOException { + return HdfsKMSUtil.createWrappedInputStream( + this, getKeyProvider(), getFileEncryptionInfo(), getConf()); + } } enum RunnerState { @@ -1964,7 +2001,7 @@ protected class ReadRunner extends AbstractFsPathRunner { private byte[] readBuffer; private int readOffset; private int readLength; - private RunnerState runnerState = RunnerState.DISCONNECTED; + private RunnerState runnerState = RunnerState.SEEK; private URL originalUrl = null; private URL resolvedUrl = null; @@ -1972,6 +2009,7 @@ protected class ReadRunner extends AbstractFsPathRunner { private final int bufferSize; private long pos = 0; private long fileLength = 0; + private FileEncryptionInfo feInfo = null; /* The following methods are WebHdfsInputStream helpers. */ @@ -1979,6 +2017,36 @@ protected class ReadRunner extends AbstractFsPathRunner { super(GetOpParam.Op.OPEN, p, new BufferSizeParam(bs)); this.path = p; this.bufferSize = bs; + getRedirectedUrl(); + } + + private void getRedirectedUrl() throws IOException { + URLRunner urlRunner = new URLRunner(GetOpParam.Op.OPEN, null, false, + false) { + @Override + protected URL getUrl() throws IOException { + return toUrl(op, path, new BufferSizeParam(bufferSize)); + } + }; + HttpURLConnection conn = urlRunner.run(); + String feInfoStr = conn.getHeaderField(FEFINFO_HEADER); + if (feInfoStr != null) { + Decoder decoder = Base64.getDecoder(); + byte[] decodedBytes = decoder.decode( + feInfoStr.getBytes(StandardCharsets.UTF_8)); + feInfo = PBHelperClient + .convert(FileEncryptionInfoProto.parseFrom(decodedBytes)); + } + String location = conn.getHeaderField("Location"); + if (location != null) { + // This saves the location for datanode where redirect was issued. + // Need to remove offset because seek can be called after open. + resolvedUrl = removeOffsetParam(new URL(location)); + } else { + // This is cached for proxies like httpfsfilesystem. + cachedConnection = conn; + } + originalUrl = super.getUrl(); } int read(byte[] b, int off, int len) throws IOException { @@ -2011,7 +2079,8 @@ int read(byte[] b, int off, int len) throws IOException { if (runnerState == RunnerState.SEEK) { try { final URL rurl = new URL(resolvedUrl + "&" + new OffsetParam(pos)); - cachedConnection = new URLRunner(GetOpParam.Op.OPEN, rurl, true).run(); + cachedConnection = new URLRunner(GetOpParam.Op.OPEN, rurl, true, + false).run(); } catch (IOException ioe) { closeInputStream(RunnerState.DISCONNECTED); } @@ -2195,5 +2264,9 @@ void setFileLength(long len) { long getPos() { return pos; } + + protected FileEncryptionInfo getFileEncryptionInfo() { + return feInfo; + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsContentLength.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsContentLength.java index 19f18b0a8dabd..6ee8858df991e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsContentLength.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsContentLength.java @@ -102,12 +102,14 @@ public void testGetOp() throws Exception { public void testGetOpWithRedirect() { Future future1 = contentLengthFuture(redirectResponse); Future future2 = contentLengthFuture(errResponse); + Future future3 = contentLengthFuture(errResponse); try { fs.open(p).read(); Assert.fail(); } catch (IOException ioe) {} // expected Assert.assertEquals(null, getContentLength(future1)); Assert.assertEquals(null, getContentLength(future2)); + Assert.assertEquals(null, getContentLength(future3)); } @Test diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java index e2ba510ec8825..d1f16a32b6b2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java @@ -28,6 +28,8 @@ import java.net.UnknownHostException; import java.security.Principal; import java.security.PrivilegedExceptionAction; +import java.util.Base64; +import java.util.Base64.Encoder; import java.util.EnumSet; import java.util.HashSet; import java.util.List; @@ -50,11 +52,14 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; +import javax.ws.rs.core.Response.ResponseBuilder; +import javax.ws.rs.core.Response.Status; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsServerDefaults; @@ -73,6 +78,7 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; @@ -117,9 +123,9 @@ public class NamenodeWebHdfsMethods { private Principal userPrincipal; private String remoteAddr; - private static volatile String serverDefaultsResponse = null; private @Context ServletContext context; private @Context HttpServletResponse response; + private boolean supportEZ; public NamenodeWebHdfsMethods(@Context HttpServletRequest request) { // the request object is a proxy to thread-locals so we have to extract @@ -130,6 +136,8 @@ public NamenodeWebHdfsMethods(@Context HttpServletRequest request) { // get the remote address, if coming in via a trusted proxy server then // the address with be that of the proxied client remoteAddr = JspHelper.getRemoteAddr(request); + supportEZ = + Boolean.valueOf(request.getHeader(WebHdfsFileSystem.EZ_HEADER)); } private void init(final UserGroupInformation ugi, @@ -228,7 +236,7 @@ public InetAddress getHostInetAddress() { static DatanodeInfo chooseDatanode(final NameNode namenode, final String path, final HttpOpParam.Op op, final long openOffset, final long blocksize, final String excludeDatanodes, - final String remoteAddr) throws IOException { + final String remoteAddr, final HdfsFileStatus status) throws IOException { FSNamesystem fsn = namenode.getNamesystem(); if (fsn == null) { throw new IOException("Namesystem has not been intialized yet."); @@ -265,7 +273,6 @@ static DatanodeInfo chooseDatanode(final NameNode namenode, || op == PostOpParam.Op.APPEND) { //choose a datanode containing a replica final NamenodeProtocols np = getRPCServer(namenode); - final HdfsFileStatus status = np.getFileInfo(path); if (status == null) { throw new FileNotFoundException("File " + path + " not found."); } @@ -285,7 +292,7 @@ static DatanodeInfo chooseDatanode(final NameNode namenode, return bestNode(locations.get(0).getLocations(), excludes); } } - } + } return (DatanodeDescriptor)bm.getDatanodeManager().getNetworkTopology( ).chooseRandom(NodeBase.ROOT, excludes); @@ -322,15 +329,22 @@ private Token generateDelegationToken( return t; } - private URI redirectURI(final NameNode namenode, + private URI redirectURI(ResponseBuilder rb, final NameNode namenode, final UserGroupInformation ugi, final DelegationParam delegation, final UserParam username, final DoAsParam doAsUser, final String path, final HttpOpParam.Op op, final long openOffset, final long blocksize, final String excludeDatanodes, final Param... parameters) throws URISyntaxException, IOException { final DatanodeInfo dn; + final NamenodeProtocols np = getRPCServer(namenode); + HdfsFileStatus status = null; + if (op == GetOpParam.Op.OPEN + || op == GetOpParam.Op.GETFILECHECKSUM + || op == PostOpParam.Op.APPEND) { + status = np.getFileInfo(path); + } dn = chooseDatanode(namenode, path, op, openOffset, blocksize, - excludeDatanodes, remoteAddr); + excludeDatanodes, remoteAddr, status); if (dn == null) { throw new IOException("Failed to find datanode, suggest to check cluster" + " health. excludeDatanodes=" + excludeDatanodes); @@ -349,15 +363,27 @@ private URI redirectURI(final NameNode namenode, namenode, ugi, null); delegationQuery = "&" + new DelegationParam(t.encodeToUrlString()); } - final String query = op.toQueryString() + delegationQuery - + "&" + new NamenodeAddressParam(namenode) - + Param.toSortedString("&", parameters); - final String uripath = WebHdfsFileSystem.PATH_PREFIX + path; + + StringBuilder queryBuilder = new StringBuilder(); + queryBuilder.append(op.toQueryString()); + queryBuilder.append(delegationQuery); + queryBuilder.append("&").append(new NamenodeAddressParam(namenode)); + queryBuilder.append(Param.toSortedString("&", parameters)); + + boolean prependReservedRawPath = false; + if (op == GetOpParam.Op.OPEN && supportEZ + && status.getFileEncryptionInfo() != null) { + prependReservedRawPath = true; + rb.header(WebHdfsFileSystem.FEFINFO_HEADER, + encodeFeInfo(status.getFileEncryptionInfo())); + } + final String uripath = WebHdfsFileSystem.PATH_PREFIX + + (prependReservedRawPath ? "/.reserved/raw" + path : path); int port = "http".equals(scheme) ? dn.getInfoPort() : dn .getInfoSecurePort(); final URI uri = new URI(scheme, null, dn.getHostName(), port, uripath, - query, null); + queryBuilder.toString(), null); if (LOG.isTraceEnabled()) { LOG.trace("redirectURI=" + uri); @@ -581,7 +607,7 @@ private Response put( switch(op.getValue()) { case CREATE: { - final URI uri = redirectURI(namenode, ugi, delegation, username, + final URI uri = redirectURI(null, namenode, ugi, delegation, username, doAsUser, fullpath, op.getValue(), -1L, blockSize.getValue(conf), exclDatanodes.getValue(), permission, unmaskedPermission, overwrite, bufferSize, replication, blockSize, createParent, @@ -830,7 +856,7 @@ private Response post( case APPEND: { final NameNode namenode = (NameNode)context.getAttribute("name.node"); - final URI uri = redirectURI(namenode, ugi, delegation, username, + final URI uri = redirectURI(null, namenode, ugi, delegation, username, doAsUser, fullpath, op.getValue(), -1L, -1L, excludeDatanodes.getValue(), bufferSize); if(!noredirectParam.getValue()) { @@ -967,6 +993,13 @@ public Response run() throws IOException, URISyntaxException { }); } + private static String encodeFeInfo(FileEncryptionInfo feInfo) { + Encoder encoder = Base64.getEncoder(); + String encodedValue = encoder + .encodeToString(PBHelperClient.convert(feInfo).toByteArray()); + return encodedValue; + } + private Response get( final UserGroupInformation ugi, final DelegationParam delegation, @@ -995,15 +1028,17 @@ private Response get( case OPEN: { final NameNode namenode = (NameNode)context.getAttribute("name.node"); - final URI uri = redirectURI(namenode, ugi, delegation, username, + ResponseBuilder rb = Response.noContent(); + final URI uri = redirectURI(rb, namenode, ugi, delegation, username, doAsUser, fullpath, op.getValue(), offset.getValue(), -1L, excludeDatanodes.getValue(), offset, length, bufferSize); if(!noredirectParam.getValue()) { - return Response.temporaryRedirect(uri) - .type(MediaType.APPLICATION_OCTET_STREAM).build(); + return rb.status(Status.TEMPORARY_REDIRECT).location(uri) + .type(MediaType.APPLICATION_OCTET_STREAM).build(); } else { final String js = JsonUtil.toJsonString("Location", uri); - return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); + return rb.status(Status.OK).entity(js).type(MediaType.APPLICATION_JSON) + .build(); } } case GET_BLOCK_LOCATIONS: @@ -1039,8 +1074,8 @@ private Response get( case GETFILECHECKSUM: { final NameNode namenode = (NameNode)context.getAttribute("name.node"); - final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser, - fullpath, op.getValue(), -1L, -1L, null); + final URI uri = redirectURI(null, namenode, ugi, delegation, username, + doAsUser, fullpath, op.getValue(), -1L, -1L, null); if(!noredirectParam.getValue()) { return Response.temporaryRedirect(uri) .type(MediaType.APPLICATION_OCTET_STREAM).build(); @@ -1140,9 +1175,12 @@ private Response get( case GETSERVERDEFAULTS: { // Since none of the server defaults values are hot reloaded, we can // cache the output of serverDefaults. + String serverDefaultsResponse = + (String) context.getAttribute("serverDefaults"); if (serverDefaultsResponse == null) { FsServerDefaults serverDefaults = cp.getServerDefaults(); serverDefaultsResponse = JsonUtil.toJsonString(serverDefaults); + context.setAttribute("serverDefaults", serverDefaultsResponse); } return Response.ok(serverDefaultsResponse) .type(MediaType.APPLICATION_JSON).build(); @@ -1152,15 +1190,6 @@ private Response get( } } - /* - * This is used only and only for testing. - * Please don't use it otherwise. - */ - @VisibleForTesting - public static void resetServerDefaultsResponse() { - serverDefaultsResponse = null; - } - private static String getTrashRoot(String fullPath, Configuration conf) throws IOException { FileSystem fs = FileSystem.get(conf != null ? conf : new Configuration()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java index ea867a815e6cb..cf13057b5006c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java @@ -20,10 +20,14 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.io.PrintStream; import java.io.RandomAccessFile; import java.io.StringReader; +import java.net.HttpURLConnection; +import java.net.InetSocketAddress; import java.net.URI; +import java.net.URL; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; @@ -41,12 +45,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CipherSuite; +import org.apache.hadoop.crypto.CryptoInputStream; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.crypto.key.JavaKeyStoreProvider; import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProviderFactory; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSTestWrapper; import org.apache.hadoop.fs.FileContext; @@ -80,6 +86,7 @@ import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Credentials; @@ -1985,4 +1992,185 @@ public void addMockKmsToken() throws Exception { Assert.assertEquals(tokens[1], testToken); Assert.assertEquals(1, creds.numberOfTokens()); } + + /** + * Creates a file with stable {@link DistributedFileSystem}. + * Tests the following 2 scenarios. + * 1. The decrypted data using {@link WebHdfsFileSystem} should be same as + * input data. + * 2. Gets the underlying raw encrypted stream and verifies that the + * encrypted data is different than input data. + * @throws Exception + */ + @Test + public void testWebhdfsRead() throws Exception { + Path zonePath = new Path("/TestEncryptionZone"); + fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false); + dfsAdmin.createEncryptionZone(zonePath, TEST_KEY, NO_TRASH); + final Path encryptedFilePath = + new Path("/TestEncryptionZone/encryptedFile.txt"); + final Path rawPath = + new Path("/.reserved/raw/TestEncryptionZone/encryptedFile.txt"); + final String content = "hello world"; + + // Create a file using DistributedFileSystem. + DFSTestUtil.writeFile(fs, encryptedFilePath, content); + final FileSystem webhdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, + WebHdfsConstants.WEBHDFS_SCHEME); + // Verify whether decrypted input stream data is same as content. + InputStream decryptedIputStream = webhdfs.open(encryptedFilePath); + verifyStreamsSame(content, decryptedIputStream); + + // Get the underlying stream from CryptoInputStream which should be + // raw encrypted bytes. + InputStream cryptoStream = + webhdfs.open(encryptedFilePath).getWrappedStream(); + Assert.assertTrue("cryptoStream should be an instance of " + + "CryptoInputStream", (cryptoStream instanceof CryptoInputStream)); + InputStream encryptedStream = + ((CryptoInputStream)cryptoStream).getWrappedStream(); + // Verify that the data read from the raw input stream is different + // from the original content. Also check it is identical to the raw + // encrypted data from dfs. + verifyRaw(content, encryptedStream, fs.open(rawPath)); + } + + private void verifyStreamsSame(String content, InputStream is) + throws IOException { + byte[] streamBytes; + try (ByteArrayOutputStream os = new ByteArrayOutputStream()) { + IOUtils.copyBytes(is, os, 1024, true); + streamBytes = os.toByteArray(); + } + Assert.assertArrayEquals(content.getBytes(), streamBytes); + } + + private void verifyRaw(String content, InputStream is, InputStream rawIs) + throws IOException { + byte[] streamBytes, rawBytes; + try (ByteArrayOutputStream os = new ByteArrayOutputStream()) { + IOUtils.copyBytes(is, os, 1024, true); + streamBytes = os.toByteArray(); + } + Assert.assertFalse(Arrays.equals(content.getBytes(), streamBytes)); + + // webhdfs raw bytes should match the raw bytes from dfs. + try (ByteArrayOutputStream os = new ByteArrayOutputStream()) { + IOUtils.copyBytes(rawIs, os, 1024, true); + rawBytes = os.toByteArray(); + } + Assert.assertArrayEquals(rawBytes, streamBytes); + } + + /* Tests that if client is old and namenode is new then the + * data will be decrypted by datanode. + * @throws Exception + */ + @Test + public void testWebhdfsReadOldBehavior() throws Exception { + Path zonePath = new Path("/TestEncryptionZone"); + fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false); + dfsAdmin.createEncryptionZone(zonePath, TEST_KEY, NO_TRASH); + final Path encryptedFilePath = new Path("/TestEncryptionZone/foo"); + final String content = "hello world"; + // Create a file using DistributedFileSystem. + DFSTestUtil.writeFile(fs, encryptedFilePath, content); + + InetSocketAddress addr = cluster.getNameNode().getHttpAddress(); + URL url = new URL("http", addr.getHostString(), addr.getPort(), + WebHdfsFileSystem.PATH_PREFIX + encryptedFilePath.toString() + + "?op=OPEN"); + // Return a connection with client not supporting EZ. + HttpURLConnection namenodeConnection = returnConnection(url, "GET", false); + String location = namenodeConnection.getHeaderField("Location"); + URL datanodeURL = new URL(location); + String path = datanodeURL.getPath(); + Assert.assertEquals( + WebHdfsFileSystem.PATH_PREFIX + encryptedFilePath.toString(), path); + HttpURLConnection datanodeConnection = returnConnection(datanodeURL, + "GET", false); + InputStream in = datanodeConnection.getInputStream(); + // Comparing with the original contents + // and making sure they are decrypted. + verifyStreamsSame(content, in); + } + + /* Tests namenode returns path starting with /.reserved/raw if client + * supports EZ and not if otherwise + * @throws Exception + */ + @Test + public void testWebhfsEZRedirectLocation() + throws Exception { + Path zonePath = new Path("/TestEncryptionZone"); + fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false); + dfsAdmin.createEncryptionZone(zonePath, TEST_KEY, NO_TRASH); + final Path encryptedFilePath = + new Path("/TestEncryptionZone/foo"); + final String content = "hello world"; + // Create a file using DistributedFileSystem. + DFSTestUtil.writeFile(fs, encryptedFilePath, content); + + InetSocketAddress addr = cluster.getNameNode().getHttpAddress(); + URL url = new URL("http", addr.getHostString(), addr.getPort(), + WebHdfsFileSystem.PATH_PREFIX + encryptedFilePath.toString() + + "?op=OPEN"); + // Return a connection with client not supporting EZ. + HttpURLConnection namenodeConnection = + returnConnection(url, "GET", false); + Assert.assertNotNull(namenodeConnection.getHeaderField("Location")); + URL datanodeUrl = new URL(namenodeConnection.getHeaderField("Location")); + Assert.assertNotNull(datanodeUrl); + String path = datanodeUrl.getPath(); + Assert.assertEquals( + WebHdfsFileSystem.PATH_PREFIX + encryptedFilePath.toString(), path); + + url = new URL("http", addr.getHostString(), addr.getPort(), + WebHdfsFileSystem.PATH_PREFIX + encryptedFilePath.toString() + + "?op=OPEN"); + // Return a connection with client supporting EZ. + namenodeConnection = returnConnection(url, "GET", true); + Assert.assertNotNull(namenodeConnection.getHeaderField("Location")); + datanodeUrl = new URL(namenodeConnection.getHeaderField("Location")); + Assert.assertNotNull(datanodeUrl); + path = datanodeUrl.getPath(); + Assert.assertEquals(WebHdfsFileSystem.PATH_PREFIX + + "/.reserved/raw" + encryptedFilePath.toString(), path); + } + + private static HttpURLConnection returnConnection(URL url, + String httpRequestType, boolean supportEZ) throws Exception { + HttpURLConnection conn = null; + conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod(httpRequestType); + conn.setDoOutput(true); + conn.setInstanceFollowRedirects(false); + if (supportEZ) { + conn.setRequestProperty(WebHdfsFileSystem.EZ_HEADER, "true"); + } + return conn; + } + + /* + * Test seek behavior of the webhdfs input stream which reads data from + * encryption zone. + */ + @Test + public void testPread() throws Exception { + Path zonePath = new Path("/TestEncryptionZone"); + fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false); + dfsAdmin.createEncryptionZone(zonePath, TEST_KEY, NO_TRASH); + final Path encryptedFilePath = + new Path("/TestEncryptionZone/foo"); + // Create a file using DistributedFileSystem. + WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, + WebHdfsConstants.WEBHDFS_SCHEME); + DFSTestUtil.createFile(webfs, encryptedFilePath, 1024, (short)1, 0xFEED); + byte[] data = DFSTestUtil.readFileAsBytes(fs, encryptedFilePath); + FSDataInputStream in = webfs.open(encryptedFilePath); + for (int i = 0; i < 1024; i++) { + in.seek(i); + Assert.assertEquals((data[i] & 0XFF), in.read()); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java index 604bf791d5922..759719d6c33df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; @@ -101,7 +102,7 @@ public void testDataLocality() throws Exception { //The chosen datanode must be the same as the client address final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, null, - LOCALHOST); + LOCALHOST, null); Assert.assertEquals(ipAddr, chosen.getIpAddr()); } } @@ -125,23 +126,26 @@ public void testDataLocality() throws Exception { //the chosen datanode must be the same as the replica location. { //test GETFILECHECKSUM + final HdfsFileStatus status = dfs.getClient().getFileInfo(f); final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, null, - LOCALHOST); + LOCALHOST, status); Assert.assertEquals(expected, chosen); } { //test OPEN + final HdfsFileStatus status = dfs.getClient().getFileInfo(f); final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( namenode, f, GetOpParam.Op.OPEN, 0, blocksize, null, - LOCALHOST); + LOCALHOST, status); Assert.assertEquals(expected, chosen); } { //test APPEND + final HdfsFileStatus status = dfs.getClient().getFileInfo(f); final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, null, - LOCALHOST); + LOCALHOST, status); Assert.assertEquals(expected, chosen); } } finally { @@ -195,9 +199,10 @@ public void testExcludeDataNodes() throws Exception { for (int i = 0; i < 2; i++) { sb.append(locations[i].getXferAddr()); { // test GETFILECHECKSUM + final HdfsFileStatus status = dfs.getClient().getFileInfo(f); final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, - sb.toString(), LOCALHOST); + sb.toString(), LOCALHOST, status); for (int j = 0; j <= i; j++) { Assert.assertNotEquals(locations[j].getHostName(), chosen.getHostName()); @@ -205,9 +210,10 @@ public void testExcludeDataNodes() throws Exception { } { // test OPEN + final HdfsFileStatus status = dfs.getClient().getFileInfo(f); final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( namenode, f, GetOpParam.Op.OPEN, 0, blocksize, sb.toString(), - LOCALHOST); + LOCALHOST, status); for (int j = 0; j <= i; j++) { Assert.assertNotEquals(locations[j].getHostName(), chosen.getHostName()); @@ -215,9 +221,10 @@ public void testExcludeDataNodes() throws Exception { } { // test APPEND + final HdfsFileStatus status = dfs.getClient().getFileInfo(f); final DatanodeInfo chosen = NamenodeWebHdfsMethods .chooseDatanode(namenode, f, PostOpParam.Op.APPEND, -1L, - blocksize, sb.toString(), LOCALHOST); + blocksize, sb.toString(), LOCALHOST, status); for (int j = 0; j <= i; j++) { Assert.assertNotEquals(locations[j].getHostName(), chosen.getHostName()); @@ -238,6 +245,6 @@ public void testChooseDatanodeBeforeNamesystemInit() throws Exception { exception.expect(IOException.class); exception.expectMessage("Namesystem has not been intialized yet."); NamenodeWebHdfsMethods.chooseDatanode(nn, "/path", PutOpParam.Op.CREATE, 0, - DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, null, LOCALHOST); + DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, null, LOCALHOST, null); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java index 500ec0acf4659..9a8c9fcf1ca65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java @@ -1435,7 +1435,6 @@ public void testFsserverDefaultsBackwardsCompatible() throws Exception { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); final WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystem( conf, WebHdfsConstants.WEBHDFS_SCHEME); - NamenodeWebHdfsMethods.resetServerDefaultsResponse(); FSNamesystem fsnSpy = NameNodeAdapter.spyOnNamesystem(cluster.getNameNode()); Mockito.when(fsnSpy.getServerDefaults()). diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java index 1862f76a0c704..153bd474c0037 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java @@ -385,7 +385,7 @@ public WebHdfsFileSystem run() throws IOException { InputStream is = fs.open(p); is.read(); is.close(); - verify(fs, times(2)).getDelegationToken(); // first bad, then good + verify(fs, times(3)).getDelegationToken(); // first bad, then good verify(fs, times(1)).replaceExpiredDelegationToken(); verify(fs, times(1)).getDelegationToken(null); verify(fs, times(1)).setDelegationToken(any()); @@ -402,7 +402,7 @@ public WebHdfsFileSystem run() throws IOException { is = fs.open(p); is.read(); is.close(); - verify(fs, times(2)).getDelegationToken(); // first bad, then good + verify(fs, times(3)).getDelegationToken(); // first bad, then good verify(fs, times(1)).replaceExpiredDelegationToken(); verify(fs, times(1)).getDelegationToken(null); verify(fs, times(1)).setDelegationToken(any());