From 3f340441574f7ec54190d98bfee636333e86abf2 Mon Sep 17 00:00:00 2001 From: Angus Helm Date: Wed, 24 May 2017 13:27:57 -0500 Subject: [PATCH 1/3] STORM-2517 add interface for Writer --- .../storm/hdfs/bolt/AbstractHdfsBolt.java | 56 ++++++++++--------- .../storm/hdfs/common/AbstractHDFSWriter.java | 5 +- 2 files changed, 33 insertions(+), 28 deletions(-) diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java index 43c01d2bbfb..601a55f492b 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java @@ -17,12 +17,6 @@ */ package org.apache.storm.hdfs.bolt; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichBolt; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.TupleUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -30,23 +24,21 @@ import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; -import org.apache.storm.hdfs.common.AbstractHDFSWriter; import org.apache.storm.hdfs.common.NullPartitioner; import org.apache.storm.hdfs.common.Partitioner; import org.apache.storm.hdfs.common.rotation.RotationAction; import org.apache.storm.hdfs.security.HdfsSecurityUtil; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.TupleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; +import java.util.*; public abstract class AbstractHdfsBolt extends BaseRichBolt { private static final Logger LOG = LoggerFactory.getLogger(AbstractHdfsBolt.class); @@ -57,7 +49,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { private static final int DEFAULT_TICK_TUPLE_INTERVAL_SECS = 15; private static final Integer DEFAULT_MAX_OPEN_FILES = 50; - protected Map writers; + protected Map writers; protected Map rotationCounterMap = new HashMap<>(); protected List rotationActions = new ArrayList<>(); protected OutputCollector collector; @@ -78,7 +70,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { protected transient Configuration hdfsConfig; - protected void rotateOutputFile(AbstractHDFSWriter writer) throws IOException { + protected void rotateOutputFile(Writer writer) throws IOException { LOG.info("Rotating output file..."); long start = System.currentTimeMillis(); synchronized (this.writeLock) { @@ -136,7 +128,7 @@ public final void execute(Tuple tuple) { synchronized (this.writeLock) { boolean forceSync = false; - AbstractHDFSWriter writer = null; + Writer writer = null; String writerKey = null; if (TupleUtils.isTick(tuple)) { @@ -202,8 +194,8 @@ public final void execute(Tuple tuple) { } } - private AbstractHDFSWriter getOrCreateWriter(String writerKey, Tuple tuple) throws IOException { - AbstractHDFSWriter writer; + private Writer getOrCreateWriter(String writerKey, Tuple tuple) throws IOException { + Writer writer; writer = writers.get(writerKey); if (writer == null) { @@ -229,7 +221,7 @@ private String getHashKeyForTuple(Tuple tuple) { return boltKey + "****" + partitionDir; } - void doRotationAndRemoveWriter(String writerKey, AbstractHDFSWriter writer) { + void doRotationAndRemoveWriter(String writerKey, Writer writer) { try { rotateOutputFile(writer); } catch (IOException e) { @@ -253,7 +245,7 @@ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } private void syncAllWriters() throws IOException { - for (AbstractHDFSWriter writer : writers.values()) { + for (Writer writer : writers.values()) { writer.sync(); } } @@ -264,7 +256,7 @@ private void startTimedRotationPolicy() { TimerTask task = new TimerTask() { @Override public void run() { - for (final AbstractHDFSWriter writer : writers.values()) { + for (final Writer writer : writers.values()) { try { rotateOutputFile(writer); } catch (IOException e) { @@ -297,9 +289,9 @@ protected Path getBasePathForNextFile(Tuple tuple) { abstract protected String getWriterKey(Tuple tuple); - abstract protected AbstractHDFSWriter makeNewWriter(Path path, Tuple tuple) throws IOException; + abstract protected Writer makeNewWriter(Path path, Tuple tuple) throws IOException; - static class WritersMap extends LinkedHashMap { + static class WritersMap extends LinkedHashMap { final long maxWriters; public WritersMap(long maxWriters) { @@ -308,8 +300,20 @@ public WritersMap(long maxWriters) { } @Override - protected boolean removeEldestEntry(Map.Entry eldest) { + protected boolean removeEldestEntry(Map.Entry eldest) { return this.size() > this.maxWriters; } } + + public interface Writer { + long write(Tuple tuple) throws IOException; + + void sync() throws IOException; + + void close() throws IOException; + + boolean needsRotation(); + + Path getFilePath(); + } } \ No newline at end of file diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java index 4b36377a16b..8a5d326a5d4 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java @@ -18,12 +18,13 @@ package org.apache.storm.hdfs.common; import org.apache.hadoop.fs.Path; +import org.apache.storm.hdfs.bolt.AbstractHdfsBolt; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.tuple.Tuple; import java.io.IOException; -abstract public class AbstractHDFSWriter { +abstract public class AbstractHDFSWriter implements AbstractHdfsBolt.Writer { long lastUsedTime; long offset; boolean needsRotation; @@ -36,7 +37,7 @@ abstract public class AbstractHDFSWriter { this.filePath = path; } - final public long write(Tuple tuple) throws IOException{ + final public long write(Tuple tuple) throws IOException { doWrite(tuple); this.needsRotation = rotationPolicy.mark(tuple, offset); From f379579243be1e4d3935421b9b7bb8a1835e9f58 Mon Sep 17 00:00:00 2001 From: Angus Helm Date: Wed, 24 May 2017 13:29:58 -0500 Subject: [PATCH 2/3] STORM-2517 make AbstractHDFSWriter properties protected --- .../apache/storm/hdfs/common/AbstractHDFSWriter.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java index 8a5d326a5d4..6a23f015111 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java @@ -25,13 +25,12 @@ import java.io.IOException; abstract public class AbstractHDFSWriter implements AbstractHdfsBolt.Writer { - long lastUsedTime; - long offset; - boolean needsRotation; - Path filePath; - FileRotationPolicy rotationPolicy; + protected long offset; + protected boolean needsRotation; + final protected Path filePath; + final protected FileRotationPolicy rotationPolicy; - AbstractHDFSWriter(FileRotationPolicy policy, Path path) { + public AbstractHDFSWriter(FileRotationPolicy policy, Path path) { //This must be defensively copied, because a bolt probably has only one rotation policy object this.rotationPolicy = policy.copy(); this.filePath = path; From a6275e13245b0a47c11ef1b0a7f83b3fe6392874 Mon Sep 17 00:00:00 2001 From: Angus Helm Date: Wed, 24 May 2017 13:37:09 -0500 Subject: [PATCH 3/3] STORM-2517 revert import changes --- .../storm/hdfs/bolt/AbstractHdfsBolt.java | 45 +++++++++++-------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java index 601a55f492b..82e5d220f27 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java @@ -17,6 +17,12 @@ */ package org.apache.storm.hdfs.bolt; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.TupleUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -28,17 +34,18 @@ import org.apache.storm.hdfs.common.Partitioner; import org.apache.storm.hdfs.common.rotation.RotationAction; import org.apache.storm.hdfs.security.HdfsSecurityUtil; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichBolt; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.TupleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; public abstract class AbstractHdfsBolt extends BaseRichBolt { private static final Logger LOG = LoggerFactory.getLogger(AbstractHdfsBolt.class); @@ -291,6 +298,18 @@ protected Path getBasePathForNextFile(Tuple tuple) { abstract protected Writer makeNewWriter(Path path, Tuple tuple) throws IOException; + public interface Writer { + long write(Tuple tuple) throws IOException; + + void sync() throws IOException; + + void close() throws IOException; + + boolean needsRotation(); + + Path getFilePath(); + } + static class WritersMap extends LinkedHashMap { final long maxWriters; @@ -304,16 +323,4 @@ protected boolean removeEldestEntry(Map.Entry eldest) { return this.size() > this.maxWriters; } } - - public interface Writer { - long write(Tuple tuple) throws IOException; - - void sync() throws IOException; - - void close() throws IOException; - - boolean needsRotation(); - - Path getFilePath(); - } } \ No newline at end of file