Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47596][DSTREAMS] Streaming: Migrate logWarn with variables to structured logging framework #46079

Closed
wants to merge 2 commits into from

Conversation

panbingkun
Copy link
Contributor

@panbingkun panbingkun commented Apr 16, 2024

What changes were proposed in this pull request?

The pr aims to migrate logWarning in module Streaming with variables to structured logging framework.

Why are the changes needed?

To enhance Apache Spark's logging system by implementing structured logging.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  • Pass GA.

Was this patch authored or co-authored using generative AI tooling?

No.

@panbingkun panbingkun changed the title [SPARK-47596][DSTREAMS] Streaming: Migrate logWarn with variables to structured logging framework [WIP][SPARK-47596][DSTREAMS] Streaming: Migrate logWarn with variables to structured logging framework Apr 16, 2024
logWarning(s"isTimeValid called with $time whereas the last valid time " +
s"is $lastValidTime")
logWarning(log"isTimeValid called with ${MDC(TIME, time)} whereas the last valid time " +
log"is ${MDC(LAST_VALID_TIME, lastValidTime)}")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Their types are Time, so the output time units are milliseconds

@@ -247,7 +247,8 @@ private[streaming] class ReceivedBlockTracker(
true
} catch {
case NonFatal(e) =>
logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)
logWarning(log"Exception thrown while writing record: " +
log"${MDC(RECEIVED_BLOCK_TRACKER_LOG_EVENT, record)} to the WriteAheadLog.", e)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it's called RECORD, maybe it's more general.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the class name RECEIVED_BLOCK_TRACKER_LOG_EVENT is also fine

@@ -884,14 +885,16 @@ abstract class DStream[T: ClassTag] (
val alignedToTime = if ((toTime - zeroTime).isMultipleOf(slideDuration)) {
toTime
} else {
logWarning(s"toTime ($toTime) is not a multiple of slideDuration ($slideDuration)")
logWarning(log"toTime (${MDC(TO_TIME, toTime)}) is not a multiple of slideDuration " +
log"(${MDC(SLIDE_DURATION, slideDuration)})")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output time units are milliseconds.

@panbingkun panbingkun changed the title [WIP][SPARK-47596][DSTREAMS] Streaming: Migrate logWarn with variables to structured logging framework [SPARK-47596][DSTREAMS] Streaming: Migrate logWarn with variables to structured logging framework Apr 18, 2024
@panbingkun panbingkun marked this pull request as ready for review April 18, 2024 11:19
@panbingkun
Copy link
Contributor Author

cc @gengliangwang

@gengliangwang
Copy link
Member

@panbingkun Thanks again for the works! Merging to master.

@smileyboy2019
Copy link

Structured Streaming supports writing Spark SQL and using SQL to write stream processing logic. Is this possible, similar to the syntax of Flink. SQL can satisfy the flow processing process.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants