-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-47577][CORE][PART1] Migrate logError with variables to structured logging framework #45834
Conversation
cc @panbingkun @pan3793 @itholic as well |
common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
Outdated
Show resolved
Hide resolved
common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
Outdated
Show resolved
Hide resolved
@@ -1736,9 +1738,11 @@ private[spark] object MapOutputTracker extends Logging { | |||
|
|||
def validateStatus(status: ShuffleOutputStatus, shuffleId: Int, partition: Int) : Unit = { | |||
if (status == null) { | |||
val errorMessage = s"Missing an output location for shuffle $shuffleId partition $partition" | |||
// scalastyle:off line.size.limit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we split
it into multiple lines? To avoid using // scalastyle:off line.size.limit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the log is not long, one line makes it easier to read. I suggest we allow both styles.
@@ -21,17 +21,56 @@ package org.apache.spark.internal | |||
* All structured logging keys should be defined here for standardization. | |||
*/ | |||
object LogKey extends Enumeration { | |||
val APPLICATION_ID = Value | |||
val EXECUTOR_ID = Value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we need to first category
by some businesses, and then sort them by alphabetically within each one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we may need a rule
, is it an abbreviation
or a complete spelling
? For example:
APPLICATION_ID
OR APP_ID
Otherwise, I am more worried that this class will become very large soon and there will be some duplicate
key meanings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created #45862
BTW, what do you mean by first category
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally planned to categorize by category
first, and then sort it in alphabetical order
for the second level.
Let me give an example:
APPLICATION-ID
K8S1ID
MEMOSL_ID
MAX_SIZE
MIN_SIZE
If we only sort by alphabetically
, we will get:
APPLICATION_ID
MAX_SIZE
MEMOSL_ID
MIN_SIZE
K8S_ID
It's a bit weird for me to see MEMOS_ID
between MAX_SIZE
and MIN-SIZE
.
If we first classify by category
at the first level and then by alphabetically
at the second level, we will obtain
# ID
APPLICATION_ID
MEMOSL_ID
K8S_ID
# SHUFFLE Value
MAX_SIZE
MIN_SIZE
Just like:
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
Lines 602 to 610 in 7dec5eb
// url functions | |
expression[UrlEncode]("url_encode"), | |
expression[UrlDecode]("url_decode"), | |
expression[ParseUrl]("parse_url"), | |
// datetime functions | |
expression[AddMonths]("add_months"), | |
expression[CurrentDate]("current_date"), | |
expressionBuilder("curdate", CurDateExpressionBuilder, setAlias = true), |
I think as our log migration work progresses, this class will become more and more large. If only sort by alphabetically
, it is not sure whether developers and the final log searcher can quickly find the LogKey
they want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@panbingkun I see. We can have a secondary category later.
In the migration, we should use generic keys and try to control the number of keys, so that the logs are easier to be queried.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course, its disadvantage is that we cannot use UT like #45857 to fully guarantee that it is written in alphabetical
order. It requires some manual intervention
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@panbingkun I see. We can have a secondary category later. In the migration, we should use generic keys and try to control the number of keys, so that the logs are easier to be queried.
Okay.
@panbingkun @pan3793 @HyukjinKwon Thanks for the review. |
…red logging framework ### What changes were proposed in this pull request? Migrate logError with variables of core module to structured logging framework. This is part2 which transforms the logError entries of the following API ``` def logError(msg: => String, throwable: Throwable): Unit ``` to ``` def logError(entry: LogEntry, throwable: Throwable): Unit ``` migration Part1 was in #45834 ### Why are the changes needed? To enhance Apache Spark's logging system by implementing structured logging. ### Does this PR introduce _any_ user-facing change? Yes, Spark core logs will contain additional MDC ### How was this patch tested? Compiler and scala style checks, as well as code review. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45890 from gengliangwang/coreError2. Authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org>
@@ -1033,8 +1037,8 @@ private[spark] class TaskSetManager( | |||
info.host, info.executorId, index, failureReason)) | |||
numFailures(index) += 1 | |||
if (numFailures(index) >= maxTaskFailures) { | |||
logError("Task %d in stage %s failed %d times; aborting job".format( | |||
index, taskSet.id, maxTaskFailures)) | |||
logError(log"Task ${MDC(TASK_ID, index)} in stage ${MDC(STAGE_ID, taskSet.id)} failed " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
task id and task index are different things. Here it's task index. @gengliangwang
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. I will find time to revisit all the usages of the TASK_ID
log key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and TASK_ATTEMPT_ID should be the same as TASK_ID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan I created #46951
@pan3793 what do you mean by the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskAttemptId = taskId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes these two are the same
What changes were proposed in this pull request?
Migrate logError with variables of core module to structured logging framework. This is part1 which transforms the logError entries of the following API
to
Why are the changes needed?
To enhance Apache Spark's logging system by implementing structured logging.
Does this PR introduce any user-facing change?
Yes, Spark core logs will contain additional MDC
How was this patch tested?
Compiler and scala style checks, as well as code review.
Was this patch authored or co-authored using generative AI tooling?
No