From 2fba9af597349fc023e04a845d1cfacfc3ab7d9e Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 24 Apr 2017 14:04:04 +0100 Subject: [PATCH 1/4] SPARK-17159 Significant speed up for running spark streaming against Object store. Based on #17745. Original work by Steve Loughran. This is a minimal patch of changes to FileInputDStream to reduce File status requests when querying files. This is a minor optimisation when working with filesystems, but significant when working with object stores. Change-Id: I269d98902f615818941c88de93a124c65453756e --- .../streaming/dstream/FileInputDStream.scala | 57 +++++------ .../spark/streaming/InputStreamsSuite.scala | 98 +++++++++++++++---- .../spark/streaming/TestSuiteBase.scala | 14 ++- 3 files changed, 118 insertions(+), 51 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 905b1c52afa69..a5f3d4e23d2bb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -17,19 +17,19 @@ package org.apache.spark.streaming.dstream -import java.io.{IOException, ObjectInputStream} +import java.io.{FileNotFoundException, IOException, ObjectInputStream} import scala.collection.mutable import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.streaming._ import org.apache.spark.streaming.scheduler.StreamInputInfo -import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Utils} +import org.apache.spark.util.{SerializableConfiguration, Utils} /** * This class represents an input stream that monitors a Hadoop-compatible filesystem for new @@ -122,9 +122,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( // Set of files that were selected in the remembered batches @transient private var recentlySelectedFiles = new mutable.HashSet[String]() - // Read-through cache of file mod times, used to speed up mod time lookups - @transient private var fileToModTime = new TimeStampedHashMap[String, Long](true) - // Timestamp of the last round of finding files @transient private var lastNewFileFindingTime = 0L @@ -140,7 +137,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( * a union RDD out of them. Note that this maintains the list of files that were processed * in the latest modification time in the previous call to this method. This is because the * modification time returned by the FileStatus API seems to return times only at the - * granularity of seconds. And new files may have the same modification time as the + * granularity of seconds in HDFS. And new files may have the same modification time as the * latest modification time in the previous call to this method yet was not reported in * the previous call. */ @@ -173,8 +170,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( logDebug("Cleared files are:\n" + oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n")) } - // Delete file mod times that weren't accessed in the last round of getting new files - fileToModTime.clearOldValues(lastNewFileFindingTime - 1) } /** @@ -196,29 +191,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( logDebug(s"Getting new files for time $currentTime, " + s"ignoring files older than $modTimeIgnoreThreshold") - val newFileFilter = new PathFilter { - def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) - } - val directoryFilter = new PathFilter { - override def accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory - } - val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath) + val directories = Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus]) + .filter(_.isDirectory) + .map(_.getPath) val newFiles = directories.flatMap(dir => - fs.listStatus(dir, newFileFilter).map(_.getPath.toString)) + fs.listStatus(dir) + .filter(isNewFile(_, currentTime, modTimeIgnoreThreshold)) + .map(_.getPath.toString)) val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime - logInfo("Finding new files took " + timeTaken + " ms") - logDebug("# cached file times = " + fileToModTime.size) + logInfo(s"Finding new files took $timeTaken ms") if (timeTaken > slideDuration.milliseconds) { logWarning( - "Time taken to find new files exceeds the batch size. " + + s"Time taken to find new files $timeTaken exceeds the batch size. " + "Consider increasing the batch size or reducing the number of " + - "files in the monitored directory." + "files in the monitored directories." ) } newFiles } catch { + case e: FileNotFoundException => + logWarning(s"No directory to scan: $directoryPath: $e") + Array.empty case e: Exception => - logWarning("Error finding new files", e) + logWarning(s"Error finding new files under $directoryPath", e) reset() Array.empty } @@ -241,8 +236,16 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( * The files with mod time T+5 are not remembered and cannot be ignored (since, t+5 > t+1). * Hence they can get selected as new files again. To prevent this, files whose mod time is more * than current batch time are not considered. + * @param fileStatus file status + * @param currentTime time of the batch + * @param modTimeIgnoreThreshold the ignore threshold + * @return true if the file has been modified within the batch window */ - private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = { + private def isNewFile( + fileStatus: FileStatus, + currentTime: Long, + modTimeIgnoreThreshold: Long): Boolean = { + val path = fileStatus.getPath val pathStr = path.toString // Reject file if it does not satisfy filter if (!filter(path)) { @@ -250,7 +253,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( return false } // Reject file if it was created before the ignore time - val modTime = getFileModTime(path) + val modTime = fileStatus.getModificationTime() if (modTime <= modTimeIgnoreThreshold) { // Use <= instead of < to avoid SPARK-4518 logDebug(s"$pathStr ignored as mod time $modTime <= ignore time $modTimeIgnoreThreshold") @@ -292,11 +295,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( new UnionRDD(context.sparkContext, fileRDDs) } - /** Get file mod time from cache or fetch it from the file system */ - private def getFileModTime(path: Path) = { - fileToModTime.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime()) - } - private def directoryPath: Path = { if (_path == null) _path = new Path(directory) _path @@ -318,7 +316,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]() batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]] recentlySelectedFiles = new mutable.HashSet[String]() - fileToModTime = new TimeStampedHashMap[String, Long](true) } /** diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index b5d36a36513ab..1cf21e8a28033 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -27,7 +27,8 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import com.google.common.io.Files -import org.apache.hadoop.fs.Path +import org.apache.commons.io.IOUtils +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.scalatest.BeforeAndAfter @@ -130,10 +131,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } test("binary records stream") { - var testDir: File = null - try { + withTempDir { testDir => val batchDuration = Seconds(2) - testDir = Utils.createTempDir() // Create a file that exists before the StreamingContext is created: val existingFile = new File(testDir, "0") Files.write("0\n", existingFile, StandardCharsets.UTF_8) @@ -176,8 +175,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(obtainedOutput(i) === input.map(b => (b + i).toByte)) } } - } finally { - if (testDir != null) Utils.deleteRecursively(testDir) } } @@ -190,10 +187,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } test("file input stream - wildcard") { - var testDir: File = null - try { + withTempDir { testDir => val batchDuration = Seconds(2) - testDir = Utils.createTempDir() val testSubDir1 = Utils.createDirectory(testDir.toString, "tmp1") val testSubDir2 = Utils.createDirectory(testDir.toString, "tmp2") @@ -221,12 +216,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // not enough to trigger a batch clock.advance(batchDuration.milliseconds / 2) - def createFileAndAdvenceTime(data: Int, dir: File): Unit = { + def createFileAndAdvanceTime(data: Int, dir: File): Unit = { val file = new File(testSubDir1, data.toString) Files.write(data + "\n", file, StandardCharsets.UTF_8) assert(file.setLastModified(clock.getTimeMillis())) assert(file.lastModified === clock.getTimeMillis()) - logInfo("Created file " + file) + logInfo(s"Created file $file") // Advance the clock after creating the file to avoid a race when // setting its modification time clock.advance(batchDuration.milliseconds) @@ -236,18 +231,85 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } // Over time, create files in the temp directory 1 val input1 = Seq(1, 2, 3, 4, 5) - input1.foreach(i => createFileAndAdvenceTime(i, testSubDir1)) + input1.foreach(i => createFileAndAdvanceTime(i, testSubDir1)) // Over time, create files in the temp directory 1 val input2 = Seq(6, 7, 8, 9, 10) - input2.foreach(i => createFileAndAdvenceTime(i, testSubDir2)) + input2.foreach(i => createFileAndAdvanceTime(i, testSubDir2)) // Verify that all the files have been read val expectedOutput = (input1 ++ input2).map(_.toString).toSet assert(outputQueue.asScala.flatten.toSet === expectedOutput) } - } finally { - if (testDir != null) Utils.deleteRecursively(testDir) + } + } + + test("Modified files are correctly detected.") { + withTempDir { testDir => + val batchDuration = Seconds(2) + val durationMs = batchDuration.milliseconds + val testPath = new Path(testDir.toURI) + val streamDir = new Path(testPath, "streaming") + val streamGlobPath = new Path(streamDir, "sub*") + val generatedDir = new Path(testPath, "generated") + val generatedSubDir = new Path(generatedDir, "subdir") + val renamedSubDir = new Path(streamDir, "subdir") + + withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => + val sparkContext = ssc.sparkContext + val hc = sparkContext.hadoopConfiguration + val fs = FileSystem.get(testPath.toUri, hc) + + fs.delete(testPath, true) + fs.mkdirs(testPath) + fs.mkdirs(streamDir) + fs.mkdirs(generatedSubDir) + + def write(path: Path, text: String): Unit = { + val out = fs.create(path, true) + IOUtils.write(text, out) + out.close() + } + + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val existingFile = new Path(generatedSubDir, "existing") + write(existingFile, "existing\n") + val status = fs.getFileStatus(existingFile) + clock.setTime(status.getModificationTime + durationMs) + val batchCounter = new BatchCounter(ssc) + val fileStream = ssc.textFileStream(streamGlobPath.toUri.toString) + val outputQueue = new ConcurrentLinkedQueue[Seq[String]] + val outputStream = new TestOutputStream(fileStream, outputQueue) + outputStream.register() + + ssc.start() + clock.advance(durationMs) + eventually(eventuallyTimeout) { + assert(1 === batchCounter.getNumCompletedBatches) + } + // create and rename the file + // put a file into the generated directory + val textPath = new Path(generatedSubDir, "renamed.txt") + write(textPath, "renamed\n") + val now = clock.getTimeMillis() + val modTime = now + durationMs / 2 + fs.setTimes(textPath, modTime, modTime) + val textFilestatus = fs.getFileStatus(existingFile) + assert(textFilestatus.getModificationTime < now + durationMs) + + // rename the directory under the path being scanned + fs.rename(generatedSubDir, renamedSubDir) + + // move forward one window + clock.advance(durationMs) + // await the next scan completing + eventually(eventuallyTimeout) { + assert(2 === batchCounter.getNumCompletedBatches) + } + // verify that the "renamed" file is found, but not the "existing" one which is out of + // the window + assert(Set("renamed") === outputQueue.asScala.flatten.toSet) + } } } @@ -416,10 +478,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } def testFileStream(newFilesOnly: Boolean) { - var testDir: File = null - try { + withTempDir { testDir => val batchDuration = Seconds(2) - testDir = Utils.createTempDir() // Create a file that exists before the StreamingContext is created: val existingFile = new File(testDir, "0") Files.write("0\n", existingFile, StandardCharsets.UTF_8) @@ -466,8 +526,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } assert(outputQueue.asScala.flatten.toSet === expectedOutput) } - } finally { - if (testDir != null) Utils.deleteRecursively(testDir) } } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index dbab70886102d..ada494eb897f3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming -import java.io.{IOException, ObjectInputStream} +import java.io.{File, IOException, ObjectInputStream} import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ @@ -557,4 +557,16 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging { verifyOutput[W](output.toSeq, expectedOutput, useSet) } } + + /** + * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` + * returns. + * (originally from `SqlTestUtils`.) + * @todo Probably this method should be moved to a more general place + */ + protected def withTempDir(f: File => Unit): Unit = { + val dir = Utils.createTempDir().getCanonicalFile + try f(dir) finally Utils.deleteRecursively(dir) + } + } From 542872cb5459fae1a66ee45aa193986e9a37fb96 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Wed, 3 Oct 2018 09:48:19 +0530 Subject: [PATCH 2/4] Fixed indents. --- .../apache/spark/streaming/dstream/FileInputDStream.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index a5f3d4e23d2bb..a1c57aea163a3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -192,12 +192,12 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( s"ignoring files older than $modTimeIgnoreThreshold") val directories = Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus]) - .filter(_.isDirectory) - .map(_.getPath) + .filter(_.isDirectory) + .map(_.getPath) val newFiles = directories.flatMap(dir => fs.listStatus(dir) - .filter(isNewFile(_, currentTime, modTimeIgnoreThreshold)) - .map(_.getPath.toString)) + .filter(isNewFile(_, currentTime, modTimeIgnoreThreshold)) + .map(_.getPath.toString)) val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime logInfo(s"Finding new files took $timeTaken ms") if (timeTaken > slideDuration.milliseconds) { From dab9bf3771994989e5de2857f91d117dc8b74623 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Wed, 3 Oct 2018 09:54:32 +0530 Subject: [PATCH 3/4] more code feedback. --- .../apache/spark/streaming/dstream/FileInputDStream.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index a1c57aea163a3..28fd1c2ae1639 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -191,15 +191,16 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( logDebug(s"Getting new files for time $currentTime, " + s"ignoring files older than $modTimeIgnoreThreshold") - val directories = Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus]) + val directories = Option(fs.globStatus(directoryPath)) .filter(_.isDirectory) .map(_.getPath) + .getOrElse(Array.empty[FileStatus]) val newFiles = directories.flatMap(dir => fs.listStatus(dir) .filter(isNewFile(_, currentTime, modTimeIgnoreThreshold)) .map(_.getPath.toString)) val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime - logInfo(s"Finding new files took $timeTaken ms") + logDebug(s"Finding new files took $timeTaken ms") if (timeTaken > slideDuration.milliseconds) { logWarning( s"Time taken to find new files $timeTaken exceeds the batch size. " + From d91c815774bff070bdb3cb149678ff080bc06b45 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Wed, 3 Oct 2018 10:11:03 +0530 Subject: [PATCH 4/4] fixed compile errors. --- .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 28fd1c2ae1639..1dcc941fca9ea 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -191,10 +191,9 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( logDebug(s"Getting new files for time $currentTime, " + s"ignoring files older than $modTimeIgnoreThreshold") - val directories = Option(fs.globStatus(directoryPath)) + val directories = Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus]) .filter(_.isDirectory) .map(_.getPath) - .getOrElse(Array.empty[FileStatus]) val newFiles = directories.flatMap(dir => fs.listStatus(dir) .filter(isNewFile(_, currentTime, modTimeIgnoreThreshold))