From c83af37fa0258f6b32676c7f5f909143cc5c6caa Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 24 Apr 2017 13:26:26 +0100 Subject: [PATCH 1/2] SPARK-20448 Document how FileInputDStream works with object storage Change-Id: I88c272444ca734dc2cbc2592607c11287b90a383 --- docs/streaming-programming-guide.md | 121 +++++++++++++++++++++++----- 1 file changed, 100 insertions(+), 21 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index abd4ac9653606..e29c2d1aab409 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -615,35 +615,114 @@ which creates a DStream from text data received over a TCP socket connection. Besides sockets, the StreamingContext API provides methods for creating DStreams from files as input sources. -- **File Streams:** For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as: +#### File Streams +{:.no_toc} + +For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as +via `StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]`. + +File streams do not require running a receiver, hence does not require allocating cores. + +For simple text files, the easiest method is `StreamingContext.textFileStream(dataDirectory)`. + +
+
+ +{% highlight scala %} +streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory) +{% endhighlight %} +For text files + +{% highlight scala %} +streamingContext.textFileStream(dataDirectory) +{% endhighlight %} +
+ +
+{% highlight java %} +streamingContext.fileStream(dataDirectory); +{% endhighlight %} +For text files + +{% highlight java %} +streamingContext.textFileStream(dataDirectory); +{% endhighlight %} +
-
-
- streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory) -
-
- streamingContext.fileStream(dataDirectory); -
-
- streamingContext.textFileStream(dataDirectory) -
-
+
+`fileStream` is not available in the Python API; only `textFileStream` is available. +{% highlight python %} +streamingContext.textFileStream(dataDirectory) +{% endhighlight %} +
- Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory (files written in nested directories not supported). Note that +
- + The files must have the same data format. - + The files must be created in the `dataDirectory` by atomically *moving* or *renaming* them into - the data directory. - + Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read. +##### How Directories are Monitored +{:.no_toc} - For simple text files, there is an easier method `streamingContext.textFileStream(dataDirectory)`. And file streams do not require running a receiver, hence does not require allocating cores. +Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory. + + * A simple directory can be monitored, such as `"hdfs://namenode:8040/logs/"`. + All files directly under such a path will be processed as they are discovered. + + A [POSIX glob pattern](http://pubs.opengroup.org/onlinepubs/009695399/utilities/xcu_chap02.html#tag_02_13_02) can be supplied, such as + `"hdfs://namenode:8040/logs/2017/*"`. + Here, the DStream will consist of all files in the directories + matching the pattern. + That is: it is a pattern of directories, not of files in directories. + + All files must be in the same data format. + * A file is considered part of a time period based on its modification time, + not its creation time. + + Once processed, changes to a file within the current window will not cause the file to be reread. + That is: *updates are ignored*. + + The more files under a directory, the longer it will take to + scan for changes — even if no files have been modified. + * If a wildcard is used to identify directories, such as `"hdfs://namenode:8040/logs/2016-*"`, + renaming an entire directory to match the path will add the directory to the list of + monitored directories. Only the files in the directory whose modification time is + within the current window will be included in the stream. + + Calling [`FileSystem.setTimes()`](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#setTimes-org.apache.hadoop.fs.Path-long-long-) + to fix the timestamp is a way to have the file picked up in a later window, even if its contents have not changed. + + +##### Streaming to FileSystems vs Object stores +{:.no_toc} - Python API `fileStream` is not available in the Python API, only `textFileStream` is available. +"Full" Filesystems such as HDFS tend to set the modification time on their files as soon +as the output stream is created. +When a file is opened, even before data has been completely written, +it may be included in the `DStream` - after which updates to the file within the same window +will be ignored. That is: changes may be missed, and data omitted from the stream. -- **Streams based on Custom Receivers:** DStreams can be created with data streams received through custom receivers. See the [Custom Receiver +To guarantee that changes are picked up in a window, write the file +to an unmonitored directory, then, immediately after the output stream is closed, +rename it into the destination directory. +Provided the renamed file appears in the scanned destination directory during the window +of its creation, the new data will be picked up. + +In contrast, Object Stores such as Amazon S3 and Azure Storage usually have slow rename operations, as the +data is actually copied. +Furthermore, renamed object may have the time of the `rename()` operation as its modification time, so +may not be considered part of the window which the original create time implied they were. + +Careful testing is needed against the target object store to verify that the timestamp behavior +of the store is consistent with that expected by Spark Streaming. It may be +that writing directly into a destination directory is the appropriate strategy for +streaming data via the chosen object store. + +For more details on this topic, consult the [Hadoop Filesystem Specification](https://hadoop.apache.org/docs/stable2/hadoop-project-dist/hadoop-common/filesystem/introduction.html). + + +#### Streams based on Custom Receivers +{:.no_toc} + +DStreams can be created with data streams received through custom receivers. See the [Custom Receiver Guide](streaming-custom-receivers.html) for more details. -- **Queue of RDDs as a Stream:** For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using `streamingContext.queueStream(queueOfRDDs)`. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream. +#### Queue of RDDs as a Stream +{:.no_toc} + +For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using `streamingContext.queueStream(queueOfRDDs)`. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream. For more details on streams from sockets and files, see the API documentations of the relevant functions in [StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) for From 1e620ceb7b5eb0df6df83525366ebc1074f8e8ce Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 24 Apr 2017 13:45:22 +0100 Subject: [PATCH 2/2] SPARK-20448 Document how FileInputDStream works with object storage Change-Id: Icef71513c228fd8d61e23a03f16b8effc89fe8eb --- docs/streaming-programming-guide.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index e29c2d1aab409..f820f9cc76b59 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -621,7 +621,7 @@ methods for creating DStreams from files as input sources. For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as via `StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]`. -File streams do not require running a receiver, hence does not require allocating cores. +File streams do not require running a receiver so there is no need to allocate any cores for receiving file data. For simple text files, the easiest method is `StreamingContext.textFileStream(dataDirectory)`. @@ -685,7 +685,7 @@ Spark Streaming will monitor the directory `dataDirectory` and process any files to fix the timestamp is a way to have the file picked up in a later window, even if its contents have not changed. -##### Streaming to FileSystems vs Object stores +##### Using Object Stores as a source of data {:.no_toc} "Full" Filesystems such as HDFS tend to set the modification time on their files as soon @@ -712,7 +712,6 @@ streaming data via the chosen object store. For more details on this topic, consult the [Hadoop Filesystem Specification](https://hadoop.apache.org/docs/stable2/hadoop-project-dist/hadoop-common/filesystem/introduction.html). - #### Streams based on Custom Receivers {:.no_toc}