From 87ad03585276f4c02b54da70a0fe62ebcdcabe8c Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 28 Sep 2017 14:42:36 -0500 Subject: [PATCH] STORM-2764: Only login the user if not already done --- .../storm/hdfs/blobstore/HdfsBlobStore.java | 105 ++++++++++-------- 1 file changed, 58 insertions(+), 47 deletions(-) diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java index a0aa8dc6cd3..7130153d6be 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java @@ -15,8 +15,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.hdfs.blobstore; +import java.io.ByteArrayOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import javax.security.auth.Subject; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.storm.Config; import org.apache.storm.blobstore.AtomicOutputStream; import org.apache.storm.blobstore.BlobStore; @@ -24,28 +38,15 @@ import org.apache.storm.blobstore.BlobStoreFile; import org.apache.storm.blobstore.InputStreamWithMeta; import org.apache.storm.generated.AuthorizationException; -import org.apache.storm.generated.KeyNotFoundException; import org.apache.storm.generated.KeyAlreadyExistsException; +import org.apache.storm.generated.KeyNotFoundException; import org.apache.storm.generated.ReadableBlobMeta; import org.apache.storm.generated.SettableBlobMeta; import org.apache.storm.nimbus.NimbusInfo; import org.apache.storm.utils.Utils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.security.auth.Subject; -import java.io.ByteArrayOutputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.security.AccessController; -import java.security.PrivilegedAction; -import java.util.Iterator; -import java.util.Map; - import static org.apache.storm.blobstore.BlobStoreAclHandler.ADMIN; import static org.apache.storm.blobstore.BlobStoreAclHandler.READ; import static org.apache.storm.blobstore.BlobStoreAclHandler.WRITE; @@ -73,9 +74,11 @@ public class HdfsBlobStore extends BlobStore { public static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStore.class); private static final String DATA_PREFIX = "data_"; private static final String META_PREFIX = "meta_"; - private BlobStoreAclHandler _aclHandler; - private HdfsBlobStoreImpl _hbs; - private Subject _localSubject; + private static final HashMap alreadyLoggedInUsers = new HashMap<>(); + + private BlobStoreAclHandler aclHandler; + private HdfsBlobStoreImpl hbs; + private Subject localSubject; private Map conf; /** @@ -106,7 +109,7 @@ public Subject run() { */ private Subject checkAndGetSubject(Subject who) { if (who == null) { - return _localSubject; + return localSubject; } return who; } @@ -137,7 +140,15 @@ protected void prepareInternal(Map conf, String overrideBase, Co String keyTab = (String) conf.get(Config.BLOBSTORE_HDFS_KEYTAB); if (principal != null && keyTab != null) { - UserGroupInformation.loginUserFromKeytab(principal, keyTab); + String combinedKey = principal + " from " + keyTab; + synchronized (alreadyLoggedInUsers) { + localSubject = alreadyLoggedInUsers.get(combinedKey); + if (localSubject == null) { + UserGroupInformation.loginUserFromKeytab(principal, keyTab); + localSubject = getHadoopUser(); + alreadyLoggedInUsers.put(combinedKey, localSubject); + } + } } else { if (principal == null && keyTab != null) { throw new RuntimeException("You must specify an HDFS principal to go with the keytab!"); @@ -147,22 +158,22 @@ protected void prepareInternal(Map conf, String overrideBase, Co throw new RuntimeException("You must specify HDFS keytab go with the principal!"); } } + localSubject = getHadoopUser(); } } catch (IOException e) { throw new RuntimeException("Error logging in from keytab!", e); } + aclHandler = new BlobStoreAclHandler(conf); Path baseDir = new Path(overrideBase, BASE_BLOBS_DIR_NAME); try { if (hadoopConf != null) { - _hbs = new HdfsBlobStoreImpl(baseDir, conf, hadoopConf); + hbs = new HdfsBlobStoreImpl(baseDir, conf, hadoopConf); } else { - _hbs = new HdfsBlobStoreImpl(baseDir, conf); + hbs = new HdfsBlobStoreImpl(baseDir, conf); } } catch (IOException e) { throw new RuntimeException(e); } - _localSubject = getHadoopUser(); - _aclHandler = new BlobStoreAclHandler(conf); } @Override @@ -173,21 +184,21 @@ public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject } who = checkAndGetSubject(who); validateKey(key); - _aclHandler.normalizeSettableBlobMeta(key, meta, who, READ | WRITE | ADMIN); + aclHandler.normalizeSettableBlobMeta(key, meta, who, READ | WRITE | ADMIN); BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl()); - _aclHandler.hasPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, key); - if (_hbs.exists(DATA_PREFIX+key)) { + aclHandler.hasPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, key); + if (hbs.exists(DATA_PREFIX + key)) { throw new KeyAlreadyExistsException(key); } BlobStoreFileOutputStream mOut = null; try { - BlobStoreFile metaFile = _hbs.write(META_PREFIX + key, true); + BlobStoreFile metaFile = hbs.write(META_PREFIX + key, true); metaFile.setMetadata(meta); mOut = new BlobStoreFileOutputStream(metaFile); mOut.write(Utils.thriftSerialize(meta)); mOut.close(); mOut = null; - BlobStoreFile dataFile = _hbs.write(DATA_PREFIX + key, true); + BlobStoreFile dataFile = hbs.write(DATA_PREFIX + key, true); dataFile.setMetadata(meta); return new BlobStoreFileOutputStream(dataFile); } catch (IOException e) { @@ -209,9 +220,9 @@ public AtomicOutputStream updateBlob(String key, Subject who) who = checkAndGetSubject(who); SettableBlobMeta meta = getStoredBlobMeta(key); validateKey(key); - _aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key); + aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key); try { - BlobStoreFile dataFile = _hbs.write(DATA_PREFIX + key, false); + BlobStoreFile dataFile = hbs.write(DATA_PREFIX + key, false); dataFile.setMetadata(meta); return new BlobStoreFileOutputStream(dataFile); } catch (IOException e) { @@ -222,7 +233,7 @@ public AtomicOutputStream updateBlob(String key, Subject who) private SettableBlobMeta getStoredBlobMeta(String key) throws KeyNotFoundException { InputStream in = null; try { - BlobStoreFile pf = _hbs.read(META_PREFIX + key); + BlobStoreFile pf = hbs.read(META_PREFIX + key); try { in = pf.getInputStream(); } catch (FileNotFoundException fnf) { @@ -256,11 +267,11 @@ public ReadableBlobMeta getBlobMeta(String key, Subject who) who = checkAndGetSubject(who); validateKey(key); SettableBlobMeta meta = getStoredBlobMeta(key); - _aclHandler.validateUserCanReadMeta(meta.get_acl(), who, key); + aclHandler.validateUserCanReadMeta(meta.get_acl(), who, key); ReadableBlobMeta rbm = new ReadableBlobMeta(); rbm.set_settable(meta); try { - BlobStoreFile pf = _hbs.read(DATA_PREFIX + key); + BlobStoreFile pf = hbs.read(DATA_PREFIX + key); rbm.set_version(pf.getModTime()); } catch (IOException e) { throw new RuntimeException(e); @@ -276,10 +287,10 @@ public void setBlobMeta(String key, SettableBlobMeta meta, Subject who) } who = checkAndGetSubject(who); validateKey(key); - _aclHandler.normalizeSettableBlobMeta(key, meta, who, ADMIN); + aclHandler.normalizeSettableBlobMeta(key, meta, who, ADMIN); BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl()); SettableBlobMeta orig = getStoredBlobMeta(key); - _aclHandler.hasPermissions(orig.get_acl(), ADMIN, who, key); + aclHandler.hasPermissions(orig.get_acl(), ADMIN, who, key); BlobStoreFileOutputStream mOut = null; writeMetadata(key, meta); } @@ -290,10 +301,10 @@ public void deleteBlob(String key, Subject who) who = checkAndGetSubject(who); validateKey(key); SettableBlobMeta meta = getStoredBlobMeta(key); - _aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key); + aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key); try { - _hbs.deleteKey(DATA_PREFIX + key); - _hbs.deleteKey(META_PREFIX + key); + hbs.deleteKey(DATA_PREFIX + key); + hbs.deleteKey(META_PREFIX + key); } catch (IOException e) { throw new RuntimeException(e); } @@ -305,9 +316,9 @@ public InputStreamWithMeta getBlob(String key, Subject who) who = checkAndGetSubject(who); validateKey(key); SettableBlobMeta meta = getStoredBlobMeta(key); - _aclHandler.hasPermissions(meta.get_acl(), READ, who, key); + aclHandler.hasPermissions(meta.get_acl(), READ, who, key); try { - return new BlobStoreFileInputStream(_hbs.read(DATA_PREFIX + key)); + return new BlobStoreFileInputStream(hbs.read(DATA_PREFIX + key)); } catch (IOException e) { throw new RuntimeException(e); } @@ -316,7 +327,7 @@ public InputStreamWithMeta getBlob(String key, Subject who) @Override public Iterator listKeys() { try { - return new KeyTranslationIterator(_hbs.listKeys(), DATA_PREFIX); + return new KeyTranslationIterator(hbs.listKeys(), DATA_PREFIX); } catch (IOException e) { throw new RuntimeException(e); } @@ -332,9 +343,9 @@ public int getBlobReplication(String key, Subject who) throws AuthorizationExcep who = checkAndGetSubject(who); validateKey(key); SettableBlobMeta meta = getStoredBlobMeta(key); - _aclHandler.hasAnyPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, key); + aclHandler.hasAnyPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, key); try { - return _hbs.getBlobReplication(DATA_PREFIX + key); + return hbs.getBlobReplication(DATA_PREFIX + key); } catch (IOException exp) { throw new RuntimeException(exp); } @@ -346,10 +357,10 @@ public int updateBlobReplication(String key, int replication, Subject who) throw validateKey(key); SettableBlobMeta meta = getStoredBlobMeta(key); meta.set_replication_factor(replication); - _aclHandler.hasAnyPermissions(meta.get_acl(), WRITE | ADMIN, who, key); + aclHandler.hasAnyPermissions(meta.get_acl(), WRITE | ADMIN, who, key); try { writeMetadata(key, meta); - return _hbs.updateBlobReplication(DATA_PREFIX + key, replication); + return hbs.updateBlobReplication(DATA_PREFIX + key, replication); } catch (IOException exp) { throw new RuntimeException(exp); } @@ -359,7 +370,7 @@ public void writeMetadata(String key, SettableBlobMeta meta) throws AuthorizationException, KeyNotFoundException { BlobStoreFileOutputStream mOut = null; try { - BlobStoreFile hdfsFile = _hbs.write(META_PREFIX + key, false); + BlobStoreFile hdfsFile = hbs.write(META_PREFIX + key, false); hdfsFile.setMetadata(meta); mOut = new BlobStoreFileOutputStream(hdfsFile); mOut.write(Utils.thriftSerialize(meta)); @@ -379,6 +390,6 @@ public void writeMetadata(String key, SettableBlobMeta meta) } public void fullCleanup(long age) throws IOException { - _hbs.fullCleanup(age); + hbs.fullCleanup(age); } }