Skip to content

Commit

Permalink
SPARK-17159 Significant speed up for running spark streaming against …
Browse files Browse the repository at this point in the history
…Object store.

Based on #17745. Original work by Steve Loughran.

This is a minimal patch of changes to FileInputDStream to reduce File status requests when querying files.

This is a minor optimisation when working with filesystems, but significant when working with object stores.

Change-Id: I269d98902f615818941c88de93a124c65453756e
  • Loading branch information
steveloughran authored and ScrapCodes committed Sep 5, 2018
1 parent 2eaf4f3 commit 2fba9af
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 51 deletions.
Expand Up @@ -17,19 +17,19 @@

package org.apache.spark.streaming.dstream

import java.io.{IOException, ObjectInputStream}
import java.io.{FileNotFoundException, IOException, ObjectInputStream}

import scala.collection.mutable
import scala.reflect.ClassTag

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.scheduler.StreamInputInfo
import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Utils}
import org.apache.spark.util.{SerializableConfiguration, Utils}

/**
* This class represents an input stream that monitors a Hadoop-compatible filesystem for new
Expand Down Expand Up @@ -122,9 +122,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
// Set of files that were selected in the remembered batches
@transient private var recentlySelectedFiles = new mutable.HashSet[String]()

// Read-through cache of file mod times, used to speed up mod time lookups
@transient private var fileToModTime = new TimeStampedHashMap[String, Long](true)

// Timestamp of the last round of finding files
@transient private var lastNewFileFindingTime = 0L

Expand All @@ -140,7 +137,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
* a union RDD out of them. Note that this maintains the list of files that were processed
* in the latest modification time in the previous call to this method. This is because the
* modification time returned by the FileStatus API seems to return times only at the
* granularity of seconds. And new files may have the same modification time as the
* granularity of seconds in HDFS. And new files may have the same modification time as the
* latest modification time in the previous call to this method yet was not reported in
* the previous call.
*/
Expand Down Expand Up @@ -173,8 +170,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
logDebug("Cleared files are:\n" +
oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
}
// Delete file mod times that weren't accessed in the last round of getting new files
fileToModTime.clearOldValues(lastNewFileFindingTime - 1)
}

/**
Expand All @@ -196,29 +191,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
logDebug(s"Getting new files for time $currentTime, " +
s"ignoring files older than $modTimeIgnoreThreshold")

val newFileFilter = new PathFilter {
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val directoryFilter = new PathFilter {
override def accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory
}
val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath)
val directories = Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus])
.filter(_.isDirectory)
.map(_.getPath)
val newFiles = directories.flatMap(dir =>
fs.listStatus(dir, newFileFilter).map(_.getPath.toString))
fs.listStatus(dir)
.filter(isNewFile(_, currentTime, modTimeIgnoreThreshold))
.map(_.getPath.toString))
val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)
logInfo(s"Finding new files took $timeTaken ms")
if (timeTaken > slideDuration.milliseconds) {
logWarning(
"Time taken to find new files exceeds the batch size. " +
s"Time taken to find new files $timeTaken exceeds the batch size. " +
"Consider increasing the batch size or reducing the number of " +
"files in the monitored directory."
"files in the monitored directories."
)
}
newFiles
} catch {
case e: FileNotFoundException =>
logWarning(s"No directory to scan: $directoryPath: $e")
Array.empty
case e: Exception =>
logWarning("Error finding new files", e)
logWarning(s"Error finding new files under $directoryPath", e)
reset()
Array.empty
}
Expand All @@ -241,16 +236,24 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
* The files with mod time T+5 are not remembered and cannot be ignored (since, t+5 > t+1).
* Hence they can get selected as new files again. To prevent this, files whose mod time is more
* than current batch time are not considered.
* @param fileStatus file status
* @param currentTime time of the batch
* @param modTimeIgnoreThreshold the ignore threshold
* @return true if the file has been modified within the batch window
*/
private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = {
private def isNewFile(
fileStatus: FileStatus,
currentTime: Long,
modTimeIgnoreThreshold: Long): Boolean = {
val path = fileStatus.getPath
val pathStr = path.toString
// Reject file if it does not satisfy filter
if (!filter(path)) {
logDebug(s"$pathStr rejected by filter")
return false
}
// Reject file if it was created before the ignore time
val modTime = getFileModTime(path)
val modTime = fileStatus.getModificationTime()
if (modTime <= modTimeIgnoreThreshold) {
// Use <= instead of < to avoid SPARK-4518
logDebug(s"$pathStr ignored as mod time $modTime <= ignore time $modTimeIgnoreThreshold")
Expand Down Expand Up @@ -292,11 +295,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
new UnionRDD(context.sparkContext, fileRDDs)
}

/** Get file mod time from cache or fetch it from the file system */
private def getFileModTime(path: Path) = {
fileToModTime.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime())
}

private def directoryPath: Path = {
if (_path == null) _path = new Path(directory)
_path
Expand All @@ -318,7 +316,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]()
batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]
recentlySelectedFiles = new mutable.HashSet[String]()
fileToModTime = new TimeStampedHashMap[String, Long](true)
}

/**
Expand Down
Expand Up @@ -27,7 +27,8 @@ import scala.collection.JavaConverters._
import scala.collection.mutable

import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.scalatest.BeforeAndAfter
Expand Down Expand Up @@ -130,10 +131,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}

test("binary records stream") {
var testDir: File = null
try {
withTempDir { testDir =>
val batchDuration = Seconds(2)
testDir = Utils.createTempDir()
// Create a file that exists before the StreamingContext is created:
val existingFile = new File(testDir, "0")
Files.write("0\n", existingFile, StandardCharsets.UTF_8)
Expand Down Expand Up @@ -176,8 +175,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
assert(obtainedOutput(i) === input.map(b => (b + i).toByte))
}
}
} finally {
if (testDir != null) Utils.deleteRecursively(testDir)
}
}

Expand All @@ -190,10 +187,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}

test("file input stream - wildcard") {
var testDir: File = null
try {
withTempDir { testDir =>
val batchDuration = Seconds(2)
testDir = Utils.createTempDir()
val testSubDir1 = Utils.createDirectory(testDir.toString, "tmp1")
val testSubDir2 = Utils.createDirectory(testDir.toString, "tmp2")

Expand Down Expand Up @@ -221,12 +216,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// not enough to trigger a batch
clock.advance(batchDuration.milliseconds / 2)

def createFileAndAdvenceTime(data: Int, dir: File): Unit = {
def createFileAndAdvanceTime(data: Int, dir: File): Unit = {
val file = new File(testSubDir1, data.toString)
Files.write(data + "\n", file, StandardCharsets.UTF_8)
assert(file.setLastModified(clock.getTimeMillis()))
assert(file.lastModified === clock.getTimeMillis())
logInfo("Created file " + file)
logInfo(s"Created file $file")
// Advance the clock after creating the file to avoid a race when
// setting its modification time
clock.advance(batchDuration.milliseconds)
Expand All @@ -236,18 +231,85 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
// Over time, create files in the temp directory 1
val input1 = Seq(1, 2, 3, 4, 5)
input1.foreach(i => createFileAndAdvenceTime(i, testSubDir1))
input1.foreach(i => createFileAndAdvanceTime(i, testSubDir1))

// Over time, create files in the temp directory 1
val input2 = Seq(6, 7, 8, 9, 10)
input2.foreach(i => createFileAndAdvenceTime(i, testSubDir2))
input2.foreach(i => createFileAndAdvanceTime(i, testSubDir2))

// Verify that all the files have been read
val expectedOutput = (input1 ++ input2).map(_.toString).toSet
assert(outputQueue.asScala.flatten.toSet === expectedOutput)
}
} finally {
if (testDir != null) Utils.deleteRecursively(testDir)
}
}

test("Modified files are correctly detected.") {
withTempDir { testDir =>
val batchDuration = Seconds(2)
val durationMs = batchDuration.milliseconds
val testPath = new Path(testDir.toURI)
val streamDir = new Path(testPath, "streaming")
val streamGlobPath = new Path(streamDir, "sub*")
val generatedDir = new Path(testPath, "generated")
val generatedSubDir = new Path(generatedDir, "subdir")
val renamedSubDir = new Path(streamDir, "subdir")

withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
val sparkContext = ssc.sparkContext
val hc = sparkContext.hadoopConfiguration
val fs = FileSystem.get(testPath.toUri, hc)

fs.delete(testPath, true)
fs.mkdirs(testPath)
fs.mkdirs(streamDir)
fs.mkdirs(generatedSubDir)

def write(path: Path, text: String): Unit = {
val out = fs.create(path, true)
IOUtils.write(text, out)
out.close()
}

val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val existingFile = new Path(generatedSubDir, "existing")
write(existingFile, "existing\n")
val status = fs.getFileStatus(existingFile)
clock.setTime(status.getModificationTime + durationMs)
val batchCounter = new BatchCounter(ssc)
val fileStream = ssc.textFileStream(streamGlobPath.toUri.toString)
val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
val outputStream = new TestOutputStream(fileStream, outputQueue)
outputStream.register()

ssc.start()
clock.advance(durationMs)
eventually(eventuallyTimeout) {
assert(1 === batchCounter.getNumCompletedBatches)
}
// create and rename the file
// put a file into the generated directory
val textPath = new Path(generatedSubDir, "renamed.txt")
write(textPath, "renamed\n")
val now = clock.getTimeMillis()
val modTime = now + durationMs / 2
fs.setTimes(textPath, modTime, modTime)
val textFilestatus = fs.getFileStatus(existingFile)
assert(textFilestatus.getModificationTime < now + durationMs)

// rename the directory under the path being scanned
fs.rename(generatedSubDir, renamedSubDir)

// move forward one window
clock.advance(durationMs)
// await the next scan completing
eventually(eventuallyTimeout) {
assert(2 === batchCounter.getNumCompletedBatches)
}
// verify that the "renamed" file is found, but not the "existing" one which is out of
// the window
assert(Set("renamed") === outputQueue.asScala.flatten.toSet)
}
}
}

Expand Down Expand Up @@ -416,10 +478,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}

def testFileStream(newFilesOnly: Boolean) {
var testDir: File = null
try {
withTempDir { testDir =>
val batchDuration = Seconds(2)
testDir = Utils.createTempDir()
// Create a file that exists before the StreamingContext is created:
val existingFile = new File(testDir, "0")
Files.write("0\n", existingFile, StandardCharsets.UTF_8)
Expand Down Expand Up @@ -466,8 +526,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
assert(outputQueue.asScala.flatten.toSet === expectedOutput)
}
} finally {
if (testDir != null) Utils.deleteRecursively(testDir)
}
}
}
Expand Down
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.streaming

import java.io.{IOException, ObjectInputStream}
import java.io.{File, IOException, ObjectInputStream}
import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -557,4 +557,16 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
verifyOutput[W](output.toSeq, expectedOutput, useSet)
}
}

/**
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
* returns.
* (originally from `SqlTestUtils`.)
* @todo Probably this method should be moved to a more general place
*/
protected def withTempDir(f: File => Unit): Unit = {
val dir = Utils.createTempDir().getCanonicalFile
try f(dir) finally Utils.deleteRecursively(dir)
}

}

0 comments on commit 2fba9af

Please sign in to comment.