New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13791][SQL]Add MetadataLog and HDFSMetadataLog #11625
Conversation
@@ -360,59 +357,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { | |||
Utils.deleteRecursively(tmp) | |||
} | |||
|
|||
test("fault tolerance with corrupted metadata file") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed these tests as they were testing the old metadata file which has been removed in this PR
Test build #52801 has finished for PR 11625 at commit
|
|
||
private val serializer = new JavaSerializer(sqlContext.sparkContext.conf).newInstance() | ||
|
||
private def tryAcquireLock(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if you die while you are holding the lock? It seems like your streaming job will be unable to restart without human intervention. Is there a reason that we can't detect problems when writing a new log entry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the semantics of add is If batchId's metatdata has already been stored, this method does nothing.
, if the writer A sees a file created by the other writer B, it won't write the file. So A won't fail and will use the metadata written by B.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this abstraction would be more powerful if we threw a ConcurrentUpdateException
and then the user could decide if that is okay. If all you are trying to get is idempotence (the file sink) then you can ignore it. If you are trying to do mutual exclusion (stream execution trying to define the offsets in each batch id) you can terminate the stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically, I don't believe that stale locks are only going to occur in extreme circumstances. If the JVM locks due to an OOM or if the container spark is running in is killed or if you kill -9. All of these become unrecoverable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could also return a boolean instead of throwing an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't use a global .lock
file, there are two cases when writing a log entry fails because of FileAlreadyExistsException:
- There is another HDFSMetadataLog using the same path
- The file is corrupted. We just restarted from a failure and tried to rerun a batch.
For case 1, we want to throw ConcurrentUpdateException
; for case 2, we need to overwrite the file.
As we need to figure out which situation we are in, we will try to read the file to see if it's completed. However, we also have two possibilities if we find the file is corrupted:
- Another HDFSMetadataLog is writing the file.
- Nobody is using the path and the file is corrupted.
So how can we know which case is right?
Test build #52835 has finished for PR 11625 at commit
|
} | ||
try { | ||
output.writeInt(buffer.remaining()) | ||
Utils.writeByteBuffer(buffer, output: DataOutput) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the type ascription?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two methods in Utils
:
def writeByteBuffer(bb: ByteBuffer, out: DataOutput): Unit
def writeByteBuffer(bb: ByteBuffer, out: OutputStream): Unit
FSDataOutputStream
is both DataOutput
and OutputStream
. The compiler doesn't know to call which one. So I need to add the type here.
Test build #53083 has finished for PR 11625 at commit
|
Test build #53103 has finished for PR 11625 at commit
|
retest this please |
Test build #53112 has finished for PR 11625 at commit
|
None | ||
} | ||
|
||
override def stop(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get rid of stop? Seems like we would like to avoid relying on this method for correctness (since we need to handle abnormal termination). So I would just leave it out of the interface entirely.
Test build #53137 has finished for PR 11625 at commit
|
Thanks! Merging to master. |
## What changes were proposed in this pull request? - Add a MetadataLog interface for metadata reliably storage. - Add HDFSMetadataLog as a MetadataLog implementation based on HDFS. - Update FileStreamSource to use HDFSMetadataLog instead of managing metadata by itself. ## How was this patch tested? unit tests Author: Shixiong Zhu <shixiong@databricks.com> Closes apache#11625 from zsxwing/metadata-log.
What changes were proposed in this pull request?
How was this patch tested?
unit tests