Skip to content

Commit

Permalink
HDFS-12574. Add CryptoInputStream to WebHdfsFileSystem read call. Con…
Browse files Browse the repository at this point in the history
…tributed by Rushabh S Shah
  • Loading branch information
kihwal committed Jan 29, 2018
1 parent 7fd287b commit fde95d4
Show file tree
Hide file tree
Showing 9 changed files with 403 additions and 90 deletions.
Expand Up @@ -38,7 +38,6 @@
import java.net.SocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
Expand All @@ -62,8 +61,6 @@
import org.apache.hadoop.crypto.CryptoOutputStream;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
Expand Down Expand Up @@ -910,46 +907,19 @@ public BlockLocation[] getBlockLocations(String src, long start,
}
}

/**
* Decrypts a EDEK by consulting the KeyProvider.
*/
private KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo
feInfo) throws IOException {
try (TraceScope ignored = tracer.newScope("decryptEDEK")) {
KeyProvider provider = getKeyProvider();
if (provider == null) {
throw new IOException("No KeyProvider is configured, cannot access" +
" an encrypted file");
}
EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption(
feInfo.getKeyName(), feInfo.getEzKeyVersionName(), feInfo.getIV(),
feInfo.getEncryptedDataEncryptionKey());
try {
KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension
.createKeyProviderCryptoExtension(provider);
return cryptoProvider.decryptEncryptedKey(ekv);
} catch (GeneralSecurityException e) {
throw new IOException(e);
}
}
}

/**
* Wraps the stream in a CryptoInputStream if the underlying file is
* encrypted.
*/
public HdfsDataInputStream createWrappedInputStream(DFSInputStream dfsis)
throws IOException {
final FileEncryptionInfo feInfo = dfsis.getFileEncryptionInfo();
FileEncryptionInfo feInfo = dfsis.getFileEncryptionInfo();
if (feInfo != null) {
// File is encrypted, wrap the stream in a crypto stream.
// Currently only one version, so no special logic based on the version #
HdfsKMSUtil.getCryptoProtocolVersion(feInfo);
final CryptoCodec codec = HdfsKMSUtil.getCryptoCodec(conf, feInfo);
final KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo);
final CryptoInputStream cryptoIn =
new CryptoInputStream(dfsis, codec, decrypted.getMaterial(),
feInfo.getIV());
CryptoInputStream cryptoIn;
try (TraceScope ignored = getTracer().newScope("decryptEDEK")) {
cryptoIn = HdfsKMSUtil.createWrappedInputStream(dfsis,
getKeyProvider(), feInfo, getConfiguration());
}
return new HdfsDataInputStream(cryptoIn);
} else {
// No FileEncryptionInfo so no encryption.
Expand Down Expand Up @@ -978,7 +948,11 @@ public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos,
// Currently only one version, so no special logic based on the version #
HdfsKMSUtil.getCryptoProtocolVersion(feInfo);
final CryptoCodec codec = HdfsKMSUtil.getCryptoCodec(conf, feInfo);
KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo);
KeyVersion decrypted;
try (TraceScope ignored = tracer.newScope("decryptEDEK")) {
decrypted = HdfsKMSUtil.decryptEncryptedDataEncryptionKey(feInfo,
getKeyProvider());
}
final CryptoOutputStream cryptoOut =
new CryptoOutputStream(dfsos, codec,
decrypted.getMaterial(), feInfo.getIV(), startPos);
Expand Down
Expand Up @@ -20,15 +20,21 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.security.GeneralSecurityException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.CryptoCodec;
import org.apache.hadoop.crypto.CryptoInputStream;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
Expand Down Expand Up @@ -187,4 +193,39 @@ public static Text getKeyProviderMapKey(URI namenodeUri) {
return new Text(DFS_KMS_PREFIX + namenodeUri.getScheme()
+"://" + namenodeUri.getAuthority());
}

public static CryptoInputStream createWrappedInputStream(InputStream is,
KeyProvider keyProvider, FileEncryptionInfo fileEncryptionInfo,
Configuration conf) throws IOException {
// File is encrypted, wrap the stream in a crypto stream.
// Currently only one version, so no special logic based on the version#
HdfsKMSUtil.getCryptoProtocolVersion(fileEncryptionInfo);
final CryptoCodec codec = HdfsKMSUtil.getCryptoCodec(
conf, fileEncryptionInfo);
final KeyVersion decrypted =
decryptEncryptedDataEncryptionKey(fileEncryptionInfo, keyProvider);
return new CryptoInputStream(is, codec, decrypted.getMaterial(),
fileEncryptionInfo.getIV());
}

/**
* Decrypts a EDEK by consulting the KeyProvider.
*/
static KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo
feInfo, KeyProvider keyProvider) throws IOException {
if (keyProvider == null) {
throw new IOException("No KeyProvider is configured, cannot access" +
" an encrypted file");
}
EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption(
feInfo.getKeyName(), feInfo.getEzKeyVersionName(), feInfo.getIV(),
feInfo.getEncryptedDataEncryptionKey());
try {
KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension
.createKeyProviderCryptoExtension(keyProvider);
return cryptoProvider.decryptEncryptedKey(ekv);
} catch (GeneralSecurityException e) {
throw new IOException(e);
}
}
}
Expand Up @@ -37,8 +37,11 @@
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Base64.Decoder;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
Expand Down Expand Up @@ -66,6 +69,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
Expand All @@ -92,6 +96,8 @@
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.FileEncryptionInfoProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.web.resources.*;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam.Op;
Expand Down Expand Up @@ -133,6 +139,8 @@ public class WebHdfsFileSystem extends FileSystem
/** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */
public static final String PATH_PREFIX = "/" + WebHdfsConstants.WEBHDFS_SCHEME
+ "/v" + VERSION;
public static final String EZ_HEADER = "X-Hadoop-Accept-EZ";
public static final String FEFINFO_HEADER = "X-Hadoop-feInfo";

/**
* Default connection factory may be overridden in tests to use smaller
Expand Down Expand Up @@ -613,12 +621,19 @@ abstract class AbstractRunner<T> {

private boolean checkRetry;
private String redirectHost;
private boolean followRedirect = true;

protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) {
this.op = op;
this.redirected = redirected;
}

protected AbstractRunner(final HttpOpParam.Op op, boolean redirected,
boolean followRedirect) {
this(op, redirected);
this.followRedirect = followRedirect;
}

T run() throws IOException {
UserGroupInformation connectUgi = ugi.getRealUser();
if (connectUgi == null) {
Expand Down Expand Up @@ -685,9 +700,17 @@ protected HttpURLConnection connect(URL url) throws IOException {
// See http://tinyurl.com/java7-http-keepalive
conn.disconnect();
}
if (!followRedirect) {
return conn;
}
}
try {
return connect(op, url);
final HttpURLConnection conn = connect(op, url);
// output streams will validate on close
if (!op.getDoOutput()) {
validateResponse(op, conn, false);
}
return conn;
} catch (IOException ioe) {
if (redirectHost != null) {
if (excludeDatanodes.getValue() != null) {
Expand All @@ -713,6 +736,7 @@ private HttpURLConnection connect(final HttpOpParam.Op op, final URL url)
// The value of the header is unimportant. Only its presence matters.
conn.setRequestProperty(restCsrfCustomHeader, "\"\"");
}
conn.setRequestProperty(EZ_HEADER, "true");
switch (op.getType()) {
// if not sending a message body for a POST or PUT operation, need
// to ensure the server/proxy knows this
Expand Down Expand Up @@ -760,10 +784,6 @@ private T runWithRetry() throws IOException {
final URL url = getUrl();
try {
final HttpURLConnection conn = connect(url);
// output streams will validate on close
if (!op.getDoOutput()) {
validateResponse(op, conn, false);
}
return getResponse(conn);
} catch (AccessControlException ace) {
// no retries for auth failures
Expand Down Expand Up @@ -809,7 +829,6 @@ private void shouldRetry(final IOException ioe, final int retry
a.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
boolean isFailoverAndRetry =
a.action == RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY;

if (isRetry || isFailoverAndRetry) {
LOG.info("Retrying connect to namenode: {}. Already retried {}"
+ " time(s); retry policy is {}, delay {}ms.",
Expand Down Expand Up @@ -990,16 +1009,16 @@ HttpURLConnection getResponse(final HttpURLConnection conn)
/**
* Used by open() which tracks the resolved url itself
*/
final class URLRunner extends AbstractRunner<HttpURLConnection> {
class URLRunner extends AbstractRunner<HttpURLConnection> {
private final URL url;
@Override
protected URL getUrl() {
protected URL getUrl() throws IOException {
return url;
}

protected URLRunner(final HttpOpParam.Op op, final URL url,
boolean redirected) {
super(op, redirected);
boolean redirected, boolean followRedirect) {
super(op, redirected, followRedirect);
this.url = url;
}

Expand Down Expand Up @@ -1412,12 +1431,20 @@ public boolean delete(Path f, boolean recursive) throws IOException {
).run();
}

@SuppressWarnings("resource")
@Override
public FSDataInputStream open(final Path f, final int bufferSize
) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.OPEN);
return new FSDataInputStream(new WebHdfsInputStream(f, bufferSize));
WebHdfsInputStream webfsInputStream =
new WebHdfsInputStream(f, bufferSize);
if (webfsInputStream.getFileEncryptionInfo() == null) {
return new FSDataInputStream(webfsInputStream);
} else {
return new FSDataInputStream(
webfsInputStream.createWrappedInputStream());
}
}

@Override
Expand Down Expand Up @@ -1462,7 +1489,8 @@ protected HttpURLConnection connect(final long offset,
final boolean resolved) throws IOException {
final URL offsetUrl = offset == 0L? url
: new URL(url + "&" + new OffsetParam(offset));
return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run();
return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved,
true).run();
}
}

Expand Down Expand Up @@ -1928,6 +1956,15 @@ ReadRunner getReadRunner() {
void setReadRunner(ReadRunner rr) {
this.readRunner = rr;
}

FileEncryptionInfo getFileEncryptionInfo() {
return readRunner.getFileEncryptionInfo();
}

InputStream createWrappedInputStream() throws IOException {
return HdfsKMSUtil.createWrappedInputStream(
this, getKeyProvider(), getFileEncryptionInfo(), getConf());
}
}

enum RunnerState {
Expand Down Expand Up @@ -1964,21 +2001,52 @@ protected class ReadRunner extends AbstractFsPathRunner<Integer> {
private byte[] readBuffer;
private int readOffset;
private int readLength;
private RunnerState runnerState = RunnerState.DISCONNECTED;
private RunnerState runnerState = RunnerState.SEEK;
private URL originalUrl = null;
private URL resolvedUrl = null;

private final Path path;
private final int bufferSize;
private long pos = 0;
private long fileLength = 0;
private FileEncryptionInfo feInfo = null;

/* The following methods are WebHdfsInputStream helpers. */

ReadRunner(Path p, int bs) throws IOException {
super(GetOpParam.Op.OPEN, p, new BufferSizeParam(bs));
this.path = p;
this.bufferSize = bs;
getRedirectedUrl();
}

private void getRedirectedUrl() throws IOException {
URLRunner urlRunner = new URLRunner(GetOpParam.Op.OPEN, null, false,
false) {
@Override
protected URL getUrl() throws IOException {
return toUrl(op, path, new BufferSizeParam(bufferSize));
}
};
HttpURLConnection conn = urlRunner.run();
String feInfoStr = conn.getHeaderField(FEFINFO_HEADER);
if (feInfoStr != null) {
Decoder decoder = Base64.getDecoder();
byte[] decodedBytes = decoder.decode(
feInfoStr.getBytes(StandardCharsets.UTF_8));
feInfo = PBHelperClient
.convert(FileEncryptionInfoProto.parseFrom(decodedBytes));
}
String location = conn.getHeaderField("Location");
if (location != null) {
// This saves the location for datanode where redirect was issued.
// Need to remove offset because seek can be called after open.
resolvedUrl = removeOffsetParam(new URL(location));
} else {
// This is cached for proxies like httpfsfilesystem.
cachedConnection = conn;
}
originalUrl = super.getUrl();
}

int read(byte[] b, int off, int len) throws IOException {
Expand Down Expand Up @@ -2011,7 +2079,8 @@ int read(byte[] b, int off, int len) throws IOException {
if (runnerState == RunnerState.SEEK) {
try {
final URL rurl = new URL(resolvedUrl + "&" + new OffsetParam(pos));
cachedConnection = new URLRunner(GetOpParam.Op.OPEN, rurl, true).run();
cachedConnection = new URLRunner(GetOpParam.Op.OPEN, rurl, true,
false).run();
} catch (IOException ioe) {
closeInputStream(RunnerState.DISCONNECTED);
}
Expand Down Expand Up @@ -2195,5 +2264,9 @@ void setFileLength(long len) {
long getPos() {
return pos;
}

protected FileEncryptionInfo getFileEncryptionInfo() {
return feInfo;
}
}
}

0 comments on commit fde95d4

Please sign in to comment.