-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19721][SS] Good error message for version mismatch in log files #17070
Conversation
Test build #73476 has finished for PR 17070 at commit
|
This is changing a lot of stuff to barely improve an error, and the PR has problems. I don't think this is worthwhile |
@@ -226,7 +226,15 @@ class KafkaSourceSuite extends KafkaSourceTest { | |||
source.getOffset.get // Read initial offset | |||
} | |||
|
|||
assert(e.getMessage.contains("Please upgrade your Spark")) | |||
Seq( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's useful to assert about the exact message. Assert that it has key substrings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done; thanks!
@@ -100,7 +100,8 @@ private[kafka010] class KafkaSource( | |||
override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { | |||
out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517) | |||
val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)) | |||
writer.write(VERSION) | |||
writer.write("v" + VERSION) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Write one string, or write this in 3 steps if you're worried about efficiency? rather than 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -195,6 +196,11 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: | |||
val input = fileManager.open(batchMetadataFile) | |||
try { | |||
Some(deserialize(input)) | |||
} catch { | |||
case ise: IllegalStateException => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just let the exception go?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the low-level exception does not know about the log file's path, and I'm trying to put it into the error message to give users very explicit information
@@ -18,6 +18,7 @@ | |||
package org.apache.spark.sql.execution.streaming | |||
|
|||
import java.io._ | |||
import java.lang.IllegalStateException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to import from java.lang
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, what a simple mistake :-)
s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " + | ||
s"by a newer version of Spark and cannot be read by this version. Please upgrade.") | ||
} | ||
else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put on previous line; no need to use return
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private[sql] def parseVersion(text: String, maxSupportedVersion: Int): Int = { | ||
if (text.length > 0 && text(0) == 'v') { | ||
val version = | ||
try { text.substring(1, text.length).toInt } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Brace style is wrong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@srowen thanks for the comments! I was trying to tackle SPARK-19721, sorry the summary just said "WIP" without a JIRA number -- adding JIRA number back. |
Test build #73487 has finished for PR 17070 at commit
|
/cc @zsxwing |
@zsxwing would you take a look when you've got a minute? Thanks! |
Jenkins retest this please |
Test build #74063 has finished for PR 17070 at commit
|
@zsxwing would you take a look when you've got a minute? Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good. Just one nit.
case ise: IllegalStateException => | ||
// re-throw the exception with the log file path added | ||
throw new IllegalStateException( | ||
s"Failed to read log file $batchMetadataFile. ${ise.getMessage}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please also add ise
as the cause.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done; thanks!
Test build #74648 has finished for PR 17070 at commit
|
LGTM. Merging to master and |
@lw-lin there are conflicts with 2.1. Could you submit a new PR for branch-2.1? |
Problem
There are several places where we write out version identifiers in various logs for structured streaming (usually
v1
). However, in the places where we check for this, we throw a confusing error message.What changes were proposed in this pull request?
This patch made two major changes:
parseVersion(...)
method, and based on this method, fixed the following places the way they did version checking (no other place needed to do this checking):FileStreamSinkLog.VERSION
,FileStreamSourceLog.VERSION
etc. fromString
toInt
, so that we can identify newer versions viaversion > 1
instead ofversion != "v1"
"v1"
and reading back"v1"
Exception message with this patch
How was this patch tested?
unit tests