-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13149][SQL]Add FileStreamSource #11034
Conversation
Test build #50604 has finished for PR 11034 at commit
|
import org.apache.spark.sql.test.SharedSQLContext | ||
import org.apache.spark.util.Utils | ||
|
||
class FileStreamSourceSuite extends StreamTest with SharedSQLContext { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we write these tests with the MemorySink
instead, using testStream
? I'd like to be able to test things like dropped batches and restarting as well. I would also be good to have them less coupled.
…out startId; address other comments
Test build #50678 has finished for PR 11034 at commit
|
retest this please |
Test build #50684 has finished for PR 11034 at commit
|
Test build #50687 has finished for PR 11034 at commit
|
import sqlContext.implicits._ | ||
|
||
/** Returns the schema of the data from this source */ | ||
override def schema: StructType = dataSchema.getOrElse(new StructType().add("value", StringType)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This getOrElse
is only going to work for the text file data source, I think that for things like JSON we should probably try and initialize the source using dataFrameBuilder
and extract the schema from there.
We should also add tests that would catch a problem here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the logic here. Now if there are any existing files, it will use them to infer the schema. And also added a test for it.
dataSchema.getOrElse { | ||
val filesPresent = fetchAllFiles() | ||
if (filesPresent.isEmpty) { | ||
new StructType().add("value", StringType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if there are no files present, we should probably still defer to the source. Those that can support that will work and those that don't will throw the correct error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those that can support that will work and those that don't will throw the correct error message.
But we need to return some StructType
here. Any magic to defer that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see, even sqlContext.read.format("text").load()
doesn't work? I would rather fix that than hardcode this here.
For sources like parquet/json it doesn't really make sense to let them point it at an empty directory so I would rather throw an error.
Test build #50779 has finished for PR 11034 at commit
|
Test build #50780 has finished for PR 11034 at commit
|
Test build #50830 has finished for PR 11034 at commit
|
Test build #50838 has finished for PR 11034 at commit
|
* an empty `Seq`. | ||
*/ | ||
def readBatch(input: InputStream): Seq[String] = { | ||
val lines = scala.io.Source.fromInputStream(input)(Codec.UTF8).getLines().toArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should validate the version too probably?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Small comments, otherwise LGTM |
Test build #50848 has finished for PR 11034 at commit
|
val src = Utils.createTempDir("streaming.src") | ||
|
||
// Only "text" doesn't need a schema | ||
createFileStreamSource("text", src.getCanonicalPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also make sure we throw a better error for this case?
scala> sqlContext.read.format("text").stream()
java.util.NoSuchElementException: key not found: path
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
at org.apache.spark.sql.sources.HadoopFsRelationProvider$class.createSource(interfaces.scala:206)
at org.apache.spark.sql.execution.datasources.text.DefaultSource.createSource(DefaultSource.scala:42)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.createSource(ResolvedDataSource.scala:107)
at org.apache.spark.sql.DataFrameReader.stream(DataFrameReader.scala:167)
... 40 elided
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to fix it in a separate PR since load
throws the same error:
scala> sqlContext.read.format("text").load()
java.util.NoSuchElementException: key not found: path
at scala.collection.MapLike$class.default(MapLike.scala:228)
at org.apache.spark.sql.execution.datasources.CaseInsensitiveMap.default(ddl.scala:159)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at org.apache.spark.sql.execution.datasources.CaseInsensitiveMap.apply(ddl.scala:159)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$10.apply(ResolvedDataSource.scala:200)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$10.apply(ResolvedDataSource.scala:200)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:200)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:129)
... 49 elided
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed the path error for stream
retest this please |
Test build #50937 has finished for PR 11034 at commit
|
private val batchToMetadata = new HashMap[Long, Seq[String]] | ||
|
||
{ | ||
// Restore statues from the metadata files |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: statuses, not statues. :) Also, what is status? Isnt it just file names?
Offline discussion: |
Test build #51009 has finished for PR 11034 at commit
|
Improved the error message as per discussion in #11034 (comment). Also made `path` and `metadataPath` in FileStreamSource case insensitive. Author: Shixiong Zhu <shixiong@databricks.com> Closes #11154 from zsxwing/path.
FileStreamSource
is an implementation oforg.apache.spark.sql.execution.streaming.Source
. It takes advantage of the existingHadoopFsRelationProvider
to support various file formats. It remembers files in each batch and stores it into the metadata files so as to recover them when restarting. The metadata files are stored in the file system. There will be a further PR to clean up the metadata files periodically.This is based on the initial work from @marmbrus.