Skip to content

[Spark-14976][Streaming] make StreamingContext.textFileStream support wildcard#12752

Closed
mwws wants to merge 4 commits intoapache:masterfrom
mwws:SPARK_FileStream
Closed

[Spark-14976][Streaming] make StreamingContext.textFileStream support wildcard#12752
mwws wants to merge 4 commits intoapache:masterfrom
mwws:SPARK_FileStream

Conversation

@mwws
Copy link

@mwws mwws commented Apr 28, 2016

What changes were proposed in this pull request?

make StreamingContext.textFileStream support wildcard
like /home/user/*/file

How was this patch tested?

I did manual test and added a new unit test case

make StreamingContext.textFileStream support wildcard
add a related unit test
@mwws
Copy link
Author

mwws commented Apr 28, 2016

@chenghao-intel

@SparkQA
Copy link

SparkQA commented Apr 28, 2016

Test build #57236 has finished for PR 12752 at commit ba8d1e5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks wrong. Now you're only looking for nested directories, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. fs.globStatus Return all the files that match filePattern and it will not dive into the nested folder.

Say folder structure looks like following
/root
..../Jack
......../subDir1
......../JackFile1
..../Rose
......../subDir1
......../subDir2
......../RoseFile1
..../file1

Case 1: /root/Jack/subDir1 it will monitor any new files added into exact "/root/Jack/subDir1" if "JackFile1" is a new added file, it's NOT in scope.
Case 2: /root/Jack it will monitor any new files added into "/root/Jack" folder. New files added into "/root/Jack/subDir1" is NOT in scope.
Case 3: /root/*/subDir1 it will minitor any new files added into both "/root/Jack/subDir1" and "/root/Rose/subDir1".
Case 4: /root/Rose/* it will monitor any new files added into "/root/Rose/subDir1" and "/root/Rose/subDir2".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, right, so I can still specify a directory and it will match that directory only in the first globStatus. That makes sense. (Nit in the line below: you don't need the extra braces and will probably have to remove the space before 'dir'.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will remove them.

@SparkQA
Copy link

SparkQA commented Apr 28, 2016

Test build #57262 has finished for PR 12752 at commit 93d9c62.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 29, 2016

Test build #57294 has finished for PR 12752 at commit f1d14bd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mwws
Copy link
Author

mwws commented Apr 29, 2016

The failed test is not related to my change (I think PR#12416 break spark CI)

@mwws
Copy link
Author

mwws commented Apr 29, 2016

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Apr 29, 2016

Test build #57301 has finished for PR 12752 at commit f1d14bd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mwws
Copy link
Author

mwws commented May 6, 2016

@srowen @zsxwing could you help merge it?

@srowen
Copy link
Member

srowen commented May 6, 2016

I'd prefer someone like @tdas to sign off as it's kind of a change in what the API supports, but it looks reasonable to me as it supports existing behavior and mimics behavior of another similar API.

var testDir: File = null
try {
val batchDuration = Seconds(2)
val testDir = Utils.createTempDir()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I just noticed this copy-and-pasted an error from the tests above. Remove "val" on this line.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch, I will fix it.

@SparkQA
Copy link

SparkQA commented May 11, 2016

Test build #58302 has finished for PR 12752 at commit 6f95df5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented May 11, 2016

Merged to master/2.0

asfgit pushed a commit that referenced this pull request May 11, 2016
… wildcard

## What changes were proposed in this pull request?
make StreamingContext.textFileStream support wildcard
like /home/user/*/file

## How was this patch tested?
I did manual test and added a new unit test case

Author: mwws <wei.mao@intel.com>
Author: unknown <maowei@maowei-MOBL.ccr.corp.intel.com>

Closes #12752 from mwws/SPARK_FileStream.

(cherry picked from commit 3359781)
Signed-off-by: Sean Owen <sowen@cloudera.com>
@asfgit asfgit closed this in 3359781 May 11, 2016
zzcclp added a commit to zzcclp/spark that referenced this pull request May 12, 2016
@mwws mwws deleted the SPARK_FileStream branch May 23, 2016 06:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants