New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-14474][SQL]Move FileSource offset log into checkpointLocation #12247
Conversation
Test build #55270 has finished for PR 12247 at commit
|
retest this please |
Test build #55308 has finished for PR 12247 at commit
|
cc @marmbrus |
*/ | ||
def createSource( | ||
sourceId: Option[Long] = None, | ||
checkpointLocation: Option[String] = None): Source = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are these optional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sourceId
and checkpointLocation
are set via DataFrameWriter. When this one is called in DataFrameReader
, we don't know them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, and we also don't really need to create a source there (we only need to know the schema). Perhaps getting the schema should be separated from getting the source (like we do in FileFormat).
Updated |
Test build #55536 has finished for PR 12247 at commit
|
Test build #55537 has finished for PR 12247 at commit
|
def createSource( | ||
sqlContext: SQLContext, | ||
sourceId: Long, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we passing the sourceId
instead of the location?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think some Source may not need a location. Instead, it just needs an id to distinguish.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought the goal was to have all the data in the same location. With this API everyone needs to duplicate the checkpoint location resolution logic.
Note that if you want a unique identifier the path also qualifies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense. I will update it.
Test build #55539 has finished for PR 12247 at commit
|
Test build #55548 has finished for PR 12247 at commit
|
def createSource( | ||
sqlContext: SQLContext, | ||
metadataPath: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is called metadataPath
to avoid confusing with checkpointLocation
since they are not the same path.
Thanks, merging to master. |
What changes were proposed in this pull request?
Now that we have a single location for storing checkpointed state. This PR just propagates the checkpoint location into FileStreamSource so that we don't have one random log off on its own.
How was this patch tested?
test("metadataPath should be in checkpointLocation")