From 96903303ccd9087fad27cf4bc4081be1332344f6 Mon Sep 17 00:00:00 2001 From: Priyank Date: Thu, 20 Aug 2015 00:21:16 -0700 Subject: [PATCH 1/3] STORM-997: Add proxy user functionality for storm hdfs connector. --- external/storm-hdfs/README.md | 16 ++++++ .../storm/hdfs/bolt/AbstractHdfsBolt.java | 36 +++++++++++-- .../org/apache/storm/hdfs/bolt/HdfsBolt.java | 53 ++++++++++++------- 3 files changed, 80 insertions(+), 25 deletions(-) diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md index e1f2b2861a7..21f6562f30c 100644 --- a/external/storm-hdfs/README.md +++ b/external/storm-hdfs/README.md @@ -236,6 +236,22 @@ If you are using Trident and sequence files you can do something like this: ``` +### Proxy User for HDFS interaction +The HDFS bolt implementation now allows you to interact with HDFS as a +user that is different than the user running the worker process in a +non-secured cluster. The bolt checks for the key `hdfs.proxyuser` in the +map that is set as the value for the key set as the config key in +the bolt using the withConfigKey method. The key value pair just mentioned is +passed to the topology in the configuration object during creation of the +topology. The HDFS config core-site.xml needs to be modified for HDFS to +allow such a proxy user functionality. More details on how to modify the file +and its limitations can be found at + +http://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-common/Superusers.html + + + + ## Support for HDFS Sequence Files The `org.apache.storm.hdfs.bolt.SequenceFileBolt` class allows you to write storm data to HDFS sequence files: diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java index f260598f08d..9ba099ad6c8 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; @@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Map; import java.util.Timer; @@ -54,6 +56,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { protected String configKey; protected transient Object writeLock; protected transient Timer rotationTimer; // only used for TimedRotationPolicy + protected transient UserGroupInformation userGroupInformation; protected transient Configuration hdfsConfig; @@ -81,7 +84,8 @@ protected void rotateOutputFile() throws IOException { * @param topologyContext * @param collector */ - public final void prepare(Map conf, TopologyContext topologyContext, OutputCollector collector){ + public final void prepare(final Map conf, final TopologyContext + topologyContext, final OutputCollector collector){ this.writeLock = new Object(); if (this.syncPolicy == null) throw new IllegalStateException("SyncPolicy must be specified."); if (this.rotationPolicy == null) throw new IllegalStateException("RotationPolicy must be specified."); @@ -99,11 +103,23 @@ public final void prepare(Map conf, TopologyContext topologyContext, OutputColle } } - try{ HdfsSecurityUtil.login(conf, hdfsConfig); - doPrepare(conf, topologyContext, collector); - this.currentFile = createOutputFile(); + String ugiUser = (String) this.hdfsConfig.get("hdfs.proxyuser"); + if (ugiUser == null) { + this.userGroupInformation = UserGroupInformation.getLoginUser(); + } else { + this.userGroupInformation = UserGroupInformation.createProxyUser + (ugiUser, + UserGroupInformation.getLoginUser()); + } + this.userGroupInformation.doAs(new PrivilegedExceptionAction() { + public Void run() throws Exception { + doPrepare(conf, topologyContext, collector); + currentFile = createOutputFile(); + return null; + } + }); } catch (Exception e){ throw new RuntimeException("Error preparing HdfsBolt: " + e.getMessage(), e); @@ -116,9 +132,19 @@ public final void prepare(Map conf, TopologyContext topologyContext, OutputColle @Override public void run() { try { - rotateOutputFile(); + userGroupInformation.doAs( + new PrivilegedExceptionAction() { + public Void run() throws Exception { + rotateOutputFile(); + return null; + } + } + ); } catch(IOException e){ LOG.warn("IOException during scheduled file rotation.", e); + } catch (InterruptedException ie) { + LOG.warn("InterruptedException during scheduled file " + + "rotation.", ie); } } }; diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java index dcb09e7d666..d7e6c098bab 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.net.URI; +import java.security.PrivilegedExceptionAction; import java.util.EnumSet; import java.util.Map; @@ -87,33 +88,45 @@ public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector } @Override - public void execute(Tuple tuple) { + public void execute(final Tuple tuple) { try { - byte[] bytes = this.format.format(tuple); - synchronized (this.writeLock) { - out.write(bytes); - this.offset += bytes.length; - - if (this.syncPolicy.mark(tuple, this.offset)) { - if (this.out instanceof HdfsDataOutputStream) { - ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); - } else { - this.out.hsync(); + this.userGroupInformation.doAs( + new PrivilegedExceptionAction() { + public Void run() throws Exception { + byte[] bytes = format.format(tuple); + synchronized (writeLock) { + out.write(bytes); + offset += bytes.length; + + if (syncPolicy.mark(tuple, offset)) { + if (out instanceof HdfsDataOutputStream) { + ((HdfsDataOutputStream) out).hsync + (EnumSet.of(SyncFlag.UPDATE_LENGTH)); + } else { + out.hsync(); + } + syncPolicy.reset(); + } + } + + collector.ack(tuple); + + if(rotationPolicy.mark(tuple, offset)){ + rotateOutputFile(); // synchronized + offset = 0; + rotationPolicy.reset(); + } + return null; } - this.syncPolicy.reset(); } - } + ); - this.collector.ack(tuple); - - if(this.rotationPolicy.mark(tuple, this.offset)){ - rotateOutputFile(); // synchronized - this.offset = 0; - this.rotationPolicy.reset(); - } } catch (IOException e) { this.collector.reportError(e); this.collector.fail(tuple); + } catch (InterruptedException ie) { + this.collector.reportError(ie); + this.collector.fail(tuple); } } From 2ee03d7fca8d4e1f3b41d8b137bac32abf1a94a4 Mon Sep 17 00:00:00 2001 From: Priyank Date: Tue, 8 Sep 2015 15:55:57 -0700 Subject: [PATCH 2/3] STORM-997: Proxy UGI for sequence files. --- .../storm/hdfs/bolt/SequenceFileBolt.java | 46 +++++++++++-------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java index baf4df030ae..71adeac41c0 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.net.URI; +import java.security.PrivilegedExceptionAction; import java.util.Map; public class SequenceFileBolt extends AbstractHdfsBolt { @@ -104,29 +105,38 @@ public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector } @Override - public void execute(Tuple tuple) { + public void execute(final Tuple tuple) { try { - long offset; - synchronized (this.writeLock) { - this.writer.append(this.format.key(tuple), this.format.value(tuple)); - offset = this.writer.getLength(); - - if (this.syncPolicy.mark(tuple, offset)) { - this.writer.hsync(); - this.syncPolicy.reset(); - } - } - - this.collector.ack(tuple); - if (this.rotationPolicy.mark(tuple, offset)) { - rotateOutputFile(); // synchronized - this.rotationPolicy.reset(); - } + this.userGroupInformation.doAs( + new PrivilegedExceptionAction() { + public Void run() throws Exception { + long offset; + synchronized (writeLock) { + writer.append(format.key(tuple), format.value(tuple)); + offset = writer.getLength(); + + if (syncPolicy.mark(tuple, offset)) { + writer.hsync(); + syncPolicy.reset(); + } + } + + collector.ack(tuple); + if (rotationPolicy.mark(tuple, offset)) { + rotateOutputFile(); // synchronized + rotationPolicy.reset(); + } + return null; + } + } + ); } catch (IOException e) { this.collector.reportError(e); this.collector.fail(tuple); + } catch (InterruptedException ie) { + this.collector.reportError(ie); + this.collector.fail(tuple); } - } Path createOutputFile() throws IOException { From 078c23aebd7a9e8b2a679fdfa8e551823c188754 Mon Sep 17 00:00:00 2001 From: Priyank Date: Thu, 24 Sep 2015 18:54:34 -0700 Subject: [PATCH 3/3] STORM-997: Proxy user feature change for hdfs trident. --- .../apache/storm/hdfs/trident/HdfsState.java | 102 ++++++++++++++---- 1 file changed, 80 insertions(+), 22 deletions(-) diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java index 444886866b2..a9b0b3185cd 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java @@ -29,6 +29,7 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.storm.hdfs.common.rotation.RotationAction; import org.apache.storm.hdfs.common.security.HdfsSecurityUtil; import org.apache.storm.hdfs.trident.format.FileNameFormat; @@ -50,6 +51,7 @@ import java.io.OutputStreamWriter; import java.io.Serializable; import java.net.URI; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; @@ -68,6 +70,7 @@ public static abstract class Options implements Serializable { protected int rotation = 0; protected transient Configuration hdfsConfig; protected ArrayList rotationActions = new ArrayList(); + protected transient UserGroupInformation userGroupInformation; abstract void closeOutputFile() throws IOException; @@ -106,7 +109,7 @@ protected void rotateOutputFile() throws IOException { } - void prepare(Map conf, int partitionIndex, int numPartitions) { + void prepare(final Map conf, final int partitionIndex, final int numPartitions) { if (this.rotationPolicy == null) { throw new IllegalStateException("RotationPolicy must be specified."); } else if (this.rotationPolicy instanceof FileSizeRotationPolicy) { @@ -132,8 +135,21 @@ void prepare(Map conf, int partitionIndex, int numPartitions) { } try { HdfsSecurityUtil.login(conf, hdfsConfig); - doPrepare(conf, partitionIndex, numPartitions); - this.currentFile = createOutputFile(); + String ugiUser = this.hdfsConfig.get("hdfs.proxyuser"); + if (ugiUser == null) { + this.userGroupInformation = UserGroupInformation.getLoginUser(); + } else { + this.userGroupInformation = UserGroupInformation.createProxyUser + (ugiUser, + UserGroupInformation.getLoginUser()); + } + this.userGroupInformation.doAs(new PrivilegedExceptionAction() { + public Void run() throws Exception { + doPrepare(conf, partitionIndex, numPartitions); + currentFile = createOutputFile(); + return null; + } + }); } catch (Exception e) { throw new RuntimeException("Error preparing HdfsState: " + e.getMessage(), e); @@ -419,7 +435,6 @@ public String toString() { } } - public static final Logger LOG = LoggerFactory.getLogger(HdfsState.class); private Options options; private volatile TxnRecord lastSeenTxn; @@ -429,9 +444,20 @@ public String toString() { this.options = options; } - void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { + void prepare(final Map conf, IMetricsContext metrics, final int + partitionIndex, int numPartitions) { this.options.prepare(conf, partitionIndex, numPartitions); - initLastTxn(conf, partitionIndex); + try { + this.options.userGroupInformation.doAs(new PrivilegedExceptionAction() { + public Void run() throws Exception { + initLastTxn(conf, partitionIndex); + return null; + } + }); + } catch (Exception e) { + throw new RuntimeException("Error preparing HdfsState: " + e + .getMessage(), e); + } } private TxnRecord readTxnRecord(Path path) throws IOException { @@ -524,31 +550,53 @@ private void updateIndex(long txId) { } @Override - public void beginCommit(Long txId) { - if (txId <= lastSeenTxn.txnid) { - LOG.info("txID {} is already processed, lastSeenTxn {}. Triggering recovery.", txId, lastSeenTxn); - long start = System.currentTimeMillis(); - options.recover(lastSeenTxn.dataFilePath, lastSeenTxn.offset); - LOG.info("Recovery took {} ms.", System.currentTimeMillis() - start); + public void beginCommit(final Long txId) { + try { + this.options.userGroupInformation.doAs(new PrivilegedExceptionAction() { + public Void run() throws Exception { + if (txId <= lastSeenTxn.txnid) { + LOG.info("txID {} is already processed, lastSeenTxn {}. Triggering recovery.", txId, lastSeenTxn); + long start = System.currentTimeMillis(); + options.recover(lastSeenTxn.dataFilePath, lastSeenTxn.offset); + LOG.info("Recovery took {} ms.", System.currentTimeMillis() - start); + } + updateIndex(txId); + return null; + } + }); + } catch (Exception e) { + throw new RuntimeException("Error in beginCommit HdfsState: " + e + .getMessage(), e); } - updateIndex(txId); + } @Override - public void commit(Long txId) { + public void commit(final Long txId) { try { - options.doCommit(txId); - } catch (IOException e) { - LOG.warn("Commit failed due to IOException. Failing the batch.", e); + this.options.userGroupInformation.doAs(new PrivilegedExceptionAction() { + public Void run() throws Exception { + options.doCommit(txId); + return null; + } + }); + } catch (Exception e) { + LOG.warn("Commit failed due to Exception. Failing the batch.", e); throw new FailedException(e); } } - public void updateState(List tuples, TridentCollector tridentCollector) { + public void updateState(final List tuples, TridentCollector + tridentCollector) { try { - this.options.execute(tuples); - } catch (IOException e) { - LOG.warn("Failing batch due to IOException.", e); + this.options.userGroupInformation.doAs(new PrivilegedExceptionAction() { + public Void run() throws Exception { + options.execute(tuples); + return null; + } + }); + } catch (Exception e) { + LOG.warn("Failing batch due to Exception.", e); throw new FailedException(e); } } @@ -557,6 +605,16 @@ public void updateState(List tuples, TridentCollector tridentColle * for unit tests */ void close() throws IOException { - this.options.closeOutputFile(); + try { + this.options.userGroupInformation.doAs(new PrivilegedExceptionAction() { + public Void run() throws Exception { + options.closeOutputFile(); + return null; + } + }); + } catch (Exception e) { + throw new RuntimeException("Error while closing HdfsState: " + e + .getMessage(), e); + } } }