-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21596][SS]Ensure places calling HDFSMetadataLog.get check the return value #18799
Conversation
Test build #80129 has finished for PR 18799 at commit
|
Test build #80138 has finished for PR 18799 at commit
|
@@ -123,7 +123,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: | |||
serialize(metadata, output) | |||
return Some(tempPath) | |||
} finally { | |||
IOUtils.closeQuietly(output) | |||
output.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The output stream may fail to close (e.g., fail to flush the internal buffer), if it happens, we should fail the query rather than ignoring it.
val allLogs = validBatches.flatMap(batchId => super.get(batchId)).flatten ++ logs | ||
val allLogs = validBatches.map { batchId => | ||
super.get(batchId).getOrElse { | ||
throw new IllegalStateException(s"batch $batchId doesn't exist") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does it mean when this get return None? somehow the batch file is missing? is there anyway to improve the message any more?
getAllValidBatches(latestId, compactInterval).flatMap(id => super.get(id)).flatten | ||
getAllValidBatches(latestId, compactInterval).map { id => | ||
super.get(id).getOrElse { | ||
throw new IllegalStateException(s"batch $id doesn't exist") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be good give more context information. like latestId, etc.
@@ -396,6 +400,7 @@ object HDFSMetadataLog { | |||
|
|||
/** | |||
* Rename a path. Note that this implementation is not atomic. | |||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this needed?
Test build #80370 has finished for PR 18799 at commit
|
// Verify that we can get all batches between `startId` and `endId`. | ||
if (startId.isDefined || endId.isDefined) { | ||
if (batchIds.isEmpty) { | ||
throw new IllegalStateException(s"batch ${startId.orElse(endId).get} doesn't exist") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be good to print the range that was asked for. otherwise its hard to see what was expected while debugging.
test("verifyBatchIds") { | ||
import HDFSMetadataLog.verifyBatchIds | ||
verifyBatchIds(Seq(1L, 2L, 3L), Some(1L), Some(3L)) | ||
verifyBatchIds(Seq(1L), Some(1L), Some(1L)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you didnt test the valid cases when one of the start or end is None.
@@ -1314,6 +1314,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { | |||
val metadataLog = | |||
new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath) | |||
assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L, 0)))) | |||
assert(metadataLog.add(1, Array(FileEntry(s"$scheme:///file2", 200L, 0)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what was this change for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newSource.getBatch(None, FileStreamSourceOffset(1))
will fail without this because batch 1
doesn't exist.
Test build #80418 has finished for PR 18799 at commit
|
@tdas addressed your comments |
LGTM. |
Test build #80429 has finished for PR 18799 at commit
|
Merged to master. But there were conflicts with 2.2. Can you make another PR for 2.2. |
… return value When I was investigating a flaky test, I realized that many places don't check the return value of `HDFSMetadataLog.get(batchId: Long): Option[T]`. When a batch is supposed to be there, the caller just ignores None rather than throwing an error. If some bug causes a query doesn't generate a batch metadata file, this behavior will hide it and allow the query continuing to run and finally delete metadata logs and make it hard to debug. This PR ensures that places calling HDFSMetadataLog.get always check the return value. Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes apache#18799 from zsxwing/SPARK-21596.
… return value Same PR as #18799 but for branch 2.2. Main discussion the other PR. -------- When I was investigating a flaky test, I realized that many places don't check the return value of `HDFSMetadataLog.get(batchId: Long): Option[T]`. When a batch is supposed to be there, the caller just ignores None rather than throwing an error. If some bug causes a query doesn't generate a batch metadata file, this behavior will hide it and allow the query continuing to run and finally delete metadata logs and make it hard to debug. This PR ensures that places calling HDFSMetadataLog.get always check the return value. Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #18890 from tdas/SPARK-21596-2.2.
… return value Same PR as apache#18799 but for branch 2.2. Main discussion the other PR. -------- When I was investigating a flaky test, I realized that many places don't check the return value of `HDFSMetadataLog.get(batchId: Long): Option[T]`. When a batch is supposed to be there, the caller just ignores None rather than throwing an error. If some bug causes a query doesn't generate a batch metadata file, this behavior will hide it and allow the query continuing to run and finally delete metadata logs and make it hard to debug. This PR ensures that places calling HDFSMetadataLog.get always check the return value. Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes apache#18890 from tdas/SPARK-21596-2.2.
What changes were proposed in this pull request?
When I was investigating a flaky test, I realized that many places don't check the return value of
HDFSMetadataLog.get(batchId: Long): Option[T]
. When a batch is supposed to be there, the caller just ignores None rather than throwing an error. If some bug causes a query doesn't generate a batch metadata file, this behavior will hide it and allow the query continuing to run and finally delete metadata logs and make it hard to debug.This PR ensures that places calling HDFSMetadataLog.get always check the return value.
How was this patch tested?
Jenkins