Skip to content

Commit

Permalink
[SPARK-30281][SS] Consider partitioned/recursive option while verifyi…
Browse files Browse the repository at this point in the history
…ng archive path on FileStreamSource

### What changes were proposed in this pull request?

This patch renews the verification logic of archive path for FileStreamSource, as we found the logic doesn't take partitioned/recursive options into account.

Before the patch, it only requires the archive path to have depth more than 2 (two subdirectories from root), leveraging the fact FileStreamSource normally reads the files where the parent directory matches the pattern or the file itself matches the pattern. Given 'archive' operation moves the files to the base archive path with retaining the full path, archive path is tend to be safe if the depth is more than 2, meaning FileStreamSource doesn't re-read archived files as new source files.

WIth partitioned/recursive options, the fact is invalid, as FileStreamSource can read any files in any depth of subdirectories for source pattern. To deal with this correctly, we have to renew the verification logic, which may not intuitive and simple but works for all cases.

The new verification logic prevents both cases:

1) archive path matches with source pattern as "prefix" (the depth of archive path > the depth of source pattern)

e.g.
* source pattern: `/hello*/spar?`
* archive path: `/hello/spark/structured/streaming`

Any files in archive path will match with source pattern when recursive option is enabled.

2) source pattern matches with archive path as "prefix" (the depth of source pattern > the depth of archive path)

e.g.
* source pattern: `/hello*/spar?/structured/hello2*`
* archive path: `/hello/spark/structured`

Some archive files will not match with source pattern, e.g. file path:  `/hello/spark/structured/hello2`, then final archived path: `/hello/spark/structured/hello/spark/structured/hello2`.

But some other archive files will still match with source pattern, e.g. file path: `/hello2/spark/structured/hello2`, then final archived path: `/hello/spark/structured/hello2/spark/structured/hello2` which matches with source pattern when recursive is enabled.

Implicitly it also prevents archive path matches with source pattern as full match (same depth).

We would want to prevent any source files to be archived and added to new source files again, so the patch takes most restrictive approach to prevent the possible cases.

### Why are the changes needed?

Without this patch, there's a chance archived files are included as new source files when partitioned/recursive option is enabled, as current condition doesn't take these options into account.

### Does this PR introduce any user-facing change?

Only for Spark 3.0.0-preview (only preview 1 for now, but possibly preview 2 as well) - end users are required to provide archive path with ensuring a bit complicated conditions, instead of simply higher than 2 depths.

### How was this patch tested?

New UT.

Closes #26920 from HeartSaVioR/SPARK-30281.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
HeartSaVioR authored and Marcelo Vanzin committed Jan 8, 2020
1 parent 0a72dba commit bd7510b
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 22 deletions.
3 changes: 2 additions & 1 deletion docs/structured-streaming-programming-guide.md
Expand Up @@ -548,7 +548,8 @@ Here are the details of all the sources in Spark.
"s3a://a/b/c/dataset.txt"<br/>
<code>cleanSource</code>: option to clean up completed files after processing.<br/>
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".<br/>
When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. <code>/archived/here</code>. This will ensure archived files are never included as new source files.<br/>
When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must not match with source pattern in depth (the number of directories from the root directory), where the depth is minimum of depth on both paths. This will ensure archived files are never included as new source files.<br/>
For example, suppose you provide '/hello?/spark/*' as source pattern, '/hello1/spark/archive/dir' cannot be used as the value of "sourceArchiveDir", as '/hello?/spark/*' and '/hello1/spark/archive' will be matched. '/hello1/spark' cannot be also used as the value of "sourceArchiveDir", as '/hello?/spark' and '/hello1/spark' will be matched. '/archived/here' would be OK as it doesn't match.<br/>
Spark will move source files respecting their own path. For example, if the path of source file is <code>/a/b/dataset.txt</code> and the path of archive directory is <code>/archived/here</code>, file will be moved to <code>/archived/here/a/b/dataset.txt</code>.<br/>
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.<br/>
NOTE 2: The source path should not be used from multiple sources or queries when enabling this option. Similarly, you must ensure the source path doesn't match to any files in output directory of file stream sink.<br/>
Expand Down
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit._
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path}

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -389,20 +389,63 @@ object FileStreamSource {
s"on a different file system than the source files. source path: $sourcePath" +
s" / base archive path: $baseArchivePath")

/**
* FileStreamSource reads the files which one of below conditions is met:
* 1) file itself is matched with source path
* 2) parent directory is matched with source path
*
* Checking with glob pattern is costly, so set this requirement to eliminate the cases
* where the archive path can be matched with source path. For example, when file is moved
* to archive directory, destination path will retain input file's path as suffix, so
* destination path can't be matched with source path if archive directory's depth is longer
* than 2, as neither file nor parent directory of destination path can be matched with
* source path.
*/
require(baseArchivePath.depth() > 2, "Base archive path must have at least 2 " +
"subdirectories from root directory. e.g. '/data/archive'")
require(!isBaseArchivePathMatchedAgainstSourcePattern, "Base archive path cannot be set to" +
" the path where archived path can possibly match with source pattern. Ensure the base " +
"archive path doesn't match with source pattern in depth, where the depth is minimum of" +
" depth on both paths.")
}

private def getAncestorEnsuringDepth(path: Path, depth: Int): Path = {
var newPath = path
while (newPath.depth() > depth) {
newPath = newPath.getParent
}
newPath
}

private def isBaseArchivePathMatchedAgainstSourcePattern: Boolean = {
// We should disallow end users to set base archive path which path matches against source
// pattern to avoid checking each source file. There're couple of cases which allow
// FileStreamSource to read any depth of subdirectory under the source pattern, so we should
// consider all three cases 1) both has same depth 2) base archive path is longer than source
// pattern 3) source pattern is longer than base archive path. To handle all cases, we take
// min of depth for both paths, and check the match.

val minDepth = math.min(sourcePath.depth(), baseArchivePath.depth())

val sourcePathMinDepth = getAncestorEnsuringDepth(sourcePath, minDepth)
val baseArchivePathMinDepth = getAncestorEnsuringDepth(baseArchivePath, minDepth)

val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePathMinDepth)

var matched = true

// pathToCompare should have same depth as sourceGlobFilters.length
var pathToCompare = baseArchivePathMinDepth
var index = 0
do {
// GlobFilter only matches against its name, not full path so it's safe to compare
if (!sourceGlobFilters(index).accept(pathToCompare)) {
matched = false
} else {
pathToCompare = pathToCompare.getParent
index += 1
}
} while (matched && !pathToCompare.isRoot)

matched
}

private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = {
val filters = new scala.collection.mutable.MutableList[GlobFilter]()

var currentPath = sourcePath
while (!currentPath.isRoot) {
filters += new GlobFilter(currentPath.getName)
currentPath = currentPath.getParent
}

filters.toList
}

override def clean(entry: FileEntry): Unit = {
Expand Down
Expand Up @@ -1814,15 +1814,29 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
override def getFileStatus(f: Path): FileStatus = throw new NotImplementedError
}

test("SourceFileArchiver - base archive path depth <= 2") {
test("SourceFileArchiver - fail when base archive path matches source pattern") {
val fakeFileSystem = new FakeFileSystem("fake")

val sourcePatternPath = new Path("/hello*/h{e,f}ll?")
val baseArchiveDirPath = new Path("/hello")

intercept[IllegalArgumentException] {
new SourceFileArchiver(fakeFileSystem, sourcePatternPath, fakeFileSystem, baseArchiveDirPath)
def assertThrowIllegalArgumentException(sourcePatttern: Path, baseArchivePath: Path): Unit = {
intercept[IllegalArgumentException] {
new SourceFileArchiver(fakeFileSystem, sourcePatttern, fakeFileSystem, baseArchivePath)
}
}

// 1) prefix of base archive path matches source pattern (baseArchiveDirPath has more depths)
val sourcePatternPath = new Path("/hello*/spar?")
val baseArchiveDirPath = new Path("/hello/spark/structured/streaming")
assertThrowIllegalArgumentException(sourcePatternPath, baseArchiveDirPath)

// 2) prefix of source pattern matches base archive path (source pattern has more depths)
val sourcePatternPath2 = new Path("/hello*/spar?/structured/streaming")
val baseArchiveDirPath2 = new Path("/hello/spark/structured")
assertThrowIllegalArgumentException(sourcePatternPath2, baseArchiveDirPath2)

// 3) source pattern matches base archive path (both have same depth)
val sourcePatternPath3 = new Path("/hello*/spar?/structured/*")
val baseArchiveDirPath3 = new Path("/hello/spark/structured/streaming")
assertThrowIllegalArgumentException(sourcePatternPath3, baseArchiveDirPath3)
}

test("SourceFileArchiver - different filesystems between source and archive") {
Expand Down

0 comments on commit bd7510b

Please sign in to comment.