Skip to content

Conversation

@igreenfield
Copy link

@igreenfield igreenfield commented May 24, 2020

What changes were proposed in this pull request?

Adds MDC propagate into all driver threads so all the logs can use it.

Why are the changes needed?

The use case is applicable in 2 cases:

  1. streaming
  2. running spark under job server (in this case you have a long-running spark context that handles many tasks from different sources).

Does this PR introduce any user-facing change?

No

How was this patch tested?

Run all tests.
Manual:
code:

MDC.put("appName", "app for test")

val session = SparkSession.builder()
      .master("local")
      .config(UI_ENABLED.key, value = false)
      .config("some-config", "v2")
      .getOrCreate()

log4j.properties:

#File Appender with MDC
log4j.appender.FAMDC=org.apache.log4j.FileAppender
log4j.appender.FAMDC.append=false
log4j.appender.FAMDC.file=target/unit-tests-mdc.log
log4j.appender.FAMDC.layout=org.apache.log4j.PatternLayout
log4j.appender.FAMDC.layout.ConversionPattern=%d{HH:mm:ss.SSS} [%X{appId}] [%X{appName}] %t %p %c{1}: %m%n

log file:

09:41:12.400 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SparkContext: Running Spark version 3.0.0-SNAPSHOT
09:41:12.777 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
09:41:12.974 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO ResourceUtils: 09:41:12.974 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO ResourceUtils: No custom resources configured for spark.driver.
09:41:12.975 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO ResourceUtils: 09:41:12.975 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SparkContext: Submitted application: c4b615df-6461-4568-bc32-c6afa43a777b
09:41:13.046 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
09:41:13.060 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO ResourceProfile: Limiting resource is cpu
09:41:13.061 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO ResourceProfileManager: Added ResourceProfile id: 0
09:41:13.341 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SecurityManager: Changing view acls to: igreenfield
09:41:13.342 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SecurityManager: Changing modify acls to: igreenfield
09:41:13.342 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SecurityManager: Changing view acls groups to: 
09:41:13.343 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SecurityManager: Changing modify acls groups to: 
09:41:13.343 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(igreenfield); groups with view permissions: Set(); users  with modify permissions: Set(igreenfield); groups with modify permissions: Set()
09:41:19.076 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO Utils: Successfully started service 'sparkDriver' on port 55518.
09:41:19.178 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SparkEnv: Registering MapOutputTracker
09:41:19.357 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SparkEnv: Registering BlockManagerMaster
09:41:19.367 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
09:41:19.371 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
09:41:19.386 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SparkEnv: Registering BlockManagerMasterHeartbeat
09:41:19.474 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO DiskBlockManager: Created local directory at C:\Users\igreenfield\AppData\Local\Temp\blockmgr-98ef6083-53df-4f18-9f66-b2aa0215a505
09:41:19.542 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO MemoryStore: MemoryStore started with capacity 4.3 GiB
09:41:19.610 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SparkEnv: Registering OutputCommitCoordinator
09:41:20.182 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO Executor: Starting executor ID driver on host IL-LP-005.global.axiomsl.com
09:41:20.311 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55528.
09:41:20.311 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO NettyBlockTransferService: Server created on IL-LP-005.global.axiomsl.com:55528
09:41:20.317 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
09:41:20.345 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, IL-LP-005.global.axiomsl.com, 55528, None)
09:41:20.363 [] [app for test] dispatcher-BlockManagerMaster INFO BlockManagerMasterEndpoint: Registering block manager IL-LP-005.global.axiomsl.com:55528 with 4.3 GiB RAM, BlockManagerId(driver, IL-LP-005.global.axiomsl.com, 55528, None)
09:41:20.370 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, IL-LP-005.global.axiomsl.com, 55528, None)
09:41:20.375 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, IL-LP-005.global.axiomsl.com, 55528, None)
09:41:20.887 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO log: Logging initialized @14628ms to org.eclipse.jetty.util.log.Slf4jLog
09:41:21.670 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SharedState: loading hive config file: file:/C:/GITHUB/igreenfield/spark/sql/core/target/scala-2.12/test-classes/hive-site.xml
09:41:21.736 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/C:/GITHUB/igreenfield/spark/sql/core/spark-warehouse').
09:41:21.737 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SharedState: Warehouse path is 'file:/C:/GITHUB/igreenfield/spark/sql/core/spark-warehouse'.
09:41:26.157 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SparkSessionBuilderSuite: 
09:41:26.221 [] [app for test] dispatcher-event-loop-0 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
09:41:26.237 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO MemoryStore: MemoryStore cleared
09:41:26.237 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO BlockManager: BlockManager stopped
09:41:26.351 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO BlockManagerMaster: BlockManagerMaster stopped
09:41:26.365 [] [app for test] dispatcher-event-loop-1 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
09:41:26.370 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SparkContext: Successfully stopped SparkContext
09:41:26.385 [] [app for test] ScalaTest-run WARN SparkSessionBuilderSuite: 
09:41:26.398 [] [app for test] Thread-1 INFO ShutdownHookManager: Shutdown hook called
09:41:26.398 [] [app for test] Thread-1 INFO ShutdownHookManager: Deleting directory C:\Users\igreenfield\AppData\Local\Temp\spark-5d205a0f-329d-4b55-9434-cb4520fab2ff

@HyukjinKwon
Copy link
Member

cc @cloud-fan

@keypointt
Copy link
Contributor

Add a test case?

@cloud-fan
Copy link
Contributor

shall we at least provide one default driver side MDC property like app id?

@igreenfield
Copy link
Author

igreenfield commented May 26, 2020

@cloud-fan Do you have an idea where in the code is the first place we can add it?

I thought to add in SparkSession.getOrCreate

def getOrCreate(): SparkSession = synchronized {
      assertOnDriver()
      // Get the session from current thread's active session.
      var session = activeThreadSession.get()
      if ((session ne null) && !session.sparkContext.isStopped) {
        applyModifiableSettings(session)
        MDC.put("appName", session.conf.get("spark.app.name", ""))
        MDC.put("appId", session.conf.get("spark.app.id", ""))
        return session
      }

@igreenfield
Copy link
Author

@keypointt Added test case

@cloud-fan
Copy link
Contributor

I thought to add in SparkSession.getOrCreate

Sounds good. It's better if you can do a local test, and show that we can see app id in the driver log.

@igreenfield
Copy link
Author

igreenfield commented May 27, 2020

@cloud-fan Do we have tests that read logs?
I added a test that checks that the MDC propagated into pools does that enough? hence the write to log is log4j do we need to check log4j?

@cloud-fan
Copy link
Contributor

I mean manual test: just check the logs by yourself.

I have a new question: how can we have multiple applications in one Spark driver? IIUC one Spark driver is dedicated to one Spark application, so it seems unnecessary to have the app id in the driver log.

@igreenfield
Copy link
Author

@cloud-fan I think you are right, it can't be. so maybe we don't need to add this as default.

@igreenfield
Copy link
Author

ping @cloud-fan what do you think.

@cloud-fan
Copy link
Contributor

I'm OK to have this framework, but without any builtin driver MDC properties.

Please find a way to test it.

@igreenfield
Copy link
Author

@cloud-fan What exactly you want to test?

@cloud-fan
Copy link
Contributor

e.g. you set a custom MDC property via SparkContext.setLocalProperty, and change the log4j config file to use this MDC property. Then run a Spark application and check if the driver log does contains the MDC property.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-31769] Add MDC support for driver threads [SPARK-31769][CORE] Add MDC support for driver threads Jun 20, 2020
@igreenfield
Copy link
Author

igreenfield commented Jun 21, 2020

@cloud-fan

  1. Do you want an automatic test for that? that will run on each build?
  2. that feature does not work with setLocalProperty it just takes the caller thread MDC and pass it in.

@cloud-fan
Copy link
Contributor

We don't need an automatic test, just test it manually and put the result in the PR description. And yes, it's not setLocalProperty

@igreenfield
Copy link
Author

@cloud-fan
code:

MDC.put("appName", "app for test")

val session = SparkSession.builder()
      .master("local")
      .config(UI_ENABLED.key, value = false)
      .config("some-config", "v2")
      .getOrCreate()

log4j.properties:

#File Appender with MDC
log4j.appender.FAMDC=org.apache.log4j.FileAppender
log4j.appender.FAMDC.append=false
log4j.appender.FAMDC.file=target/unit-tests-mdc.log
log4j.appender.FAMDC.layout=org.apache.log4j.PatternLayout
log4j.appender.FAMDC.layout.ConversionPattern=%d{HH:mm:ss.SSS} [%X{appId}] [%X{appName}] %t %p %c{1}: %m%n

log file:

09:41:12.400 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SparkContext: Running Spark version 3.0.0-SNAPSHOT
09:41:12.777 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
09:41:12.974 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO ResourceUtils: 09:41:12.974 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO ResourceUtils: No custom resources configured for spark.driver.
09:41:12.975 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO ResourceUtils: 09:41:12.975 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SparkContext: Submitted application: c4b615df-6461-4568-bc32-c6afa43a777b
09:41:13.046 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
09:41:13.060 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO ResourceProfile: Limiting resource is cpu
09:41:13.061 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO ResourceProfileManager: Added ResourceProfile id: 0
09:41:13.341 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SecurityManager: Changing view acls to: igreenfield
09:41:13.342 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SecurityManager: Changing modify acls to: igreenfield
09:41:13.342 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SecurityManager: Changing view acls groups to: 
09:41:13.343 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SecurityManager: Changing modify acls groups to: 
09:41:13.343 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(igreenfield); groups with view permissions: Set(); users  with modify permissions: Set(igreenfield); groups with modify permissions: Set()
09:41:19.076 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO Utils: Successfully started service 'sparkDriver' on port 55518.
09:41:19.178 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SparkEnv: Registering MapOutputTracker
09:41:19.357 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SparkEnv: Registering BlockManagerMaster
09:41:19.367 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
09:41:19.371 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
09:41:19.386 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SparkEnv: Registering BlockManagerMasterHeartbeat
09:41:19.474 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO DiskBlockManager: Created local directory at C:\Users\igreenfield\AppData\Local\Temp\blockmgr-98ef6083-53df-4f18-9f66-b2aa0215a505
09:41:19.542 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO MemoryStore: MemoryStore started with capacity 4.3 GiB
09:41:19.610 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SparkEnv: Registering OutputCommitCoordinator
09:41:20.182 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO Executor: Starting executor ID driver on host IL-LP-005.global.axiomsl.com
09:41:20.311 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55528.
09:41:20.311 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO NettyBlockTransferService: Server created on IL-LP-005.global.axiomsl.com:55528
09:41:20.317 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
09:41:20.345 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, IL-LP-005.global.axiomsl.com, 55528, None)
09:41:20.363 [] [app for test] dispatcher-BlockManagerMaster INFO BlockManagerMasterEndpoint: Registering block manager IL-LP-005.global.axiomsl.com:55528 with 4.3 GiB RAM, BlockManagerId(driver, IL-LP-005.global.axiomsl.com, 55528, None)
09:41:20.370 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, IL-LP-005.global.axiomsl.com, 55528, None)
09:41:20.375 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, IL-LP-005.global.axiomsl.com, 55528, None)
09:41:20.887 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO log: Logging initialized @14628ms to org.eclipse.jetty.util.log.Slf4jLog
09:41:21.670 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SharedState: loading hive config file: file:/C:/GITHUB/igreenfield/spark/sql/core/target/scala-2.12/test-classes/hive-site.xml
09:41:21.736 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/C:/GITHUB/igreenfield/spark/sql/core/spark-warehouse').
09:41:21.737 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SharedState: Warehouse path is 'file:/C:/GITHUB/igreenfield/spark/sql/core/spark-warehouse'.
09:41:26.157 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SparkSessionBuilderSuite: 
09:41:26.221 [] [app for test] dispatcher-event-loop-0 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
09:41:26.237 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO MemoryStore: MemoryStore cleared
09:41:26.237 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO BlockManager: BlockManager stopped
09:41:26.351 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO BlockManagerMaster: BlockManagerMaster stopped
09:41:26.365 [] [app for test] dispatcher-event-loop-1 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
09:41:26.370 [] [app for test] ScalaTest-run-running-SparkSessionBuilderSuite INFO SparkContext: Successfully stopped SparkContext
09:41:26.385 [] [app for test] ScalaTest-run WARN SparkSessionBuilderSuite: 
09:41:26.398 [] [app for test] Thread-1 INFO ShutdownHookManager: Shutdown hook called
09:41:26.398 [] [app for test] Thread-1 INFO ShutdownHookManager: Deleting directory C:\Users\igreenfield\AppData\Local\Temp\spark-5d205a0f-329d-4b55-9434-cb4520fab2ff

@cloud-fan
Copy link
Contributor

yea test is useful. Now I have one question: for task MDC properties, they are always prefixed with mdc.. Shall we do the same for driver MDC properties?

@igreenfield
Copy link
Author

the driver inherits the MDC from calling thread so I think from user perspective it is not convenient as in most cases user will want to use its regular keys not create a dedicated one.

@cloud-fan
Copy link
Contributor

But inconsistency is also bad. @Ngone51 do you have any ideas?

@Ngone51
Copy link
Member

Ngone51 commented Jun 24, 2020

Maybe, we could provide API for the user to add MDC properties and restrict the format of the MDC key? For example, sc.setAppMDC/sc.setDriverMDC/sc.setTaskMDC.

@cloud-fan
Copy link
Contributor

BTW, what's wrong with reusing the sc.localProperties for driver-side MDC? When you define driver-side MDC properties, usually you would also want to display it in the executor side logs.

@igreenfield
Copy link
Author

we can add support of adding MDC to the driver using sc.localProperties, but from my experience, on the driver side, it better to inherit it.

@dongjoon-hyun
Copy link
Member

Retest this please

@SparkQA
Copy link

SparkQA commented Jun 27, 2020

Test build #124571 has finished for PR 28629 at commit cc8d522.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Jun 28, 2020

Test build #124575 has finished for PR 28629 at commit cc8d522.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@igreenfield
Copy link
Author

@dongjoon-hyun the failed test does not seems to be connected to the code changes.


test("newDaemonSingleThreadExecutor with MDC") {
val appIdValue = s"appId-${System.currentTimeMillis()}"
MDC.put("appId", appIdValue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still worried about API inconsistency. We should use either sc.setLocalProperties or MDC.put for both driver and executor side MDC properties.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I think he is using MDC.put in both cases. The only difference is that for executor we also have to call sc.setLocalProperties in order to pass the property into the executor JVM. Once the executor process has knowledge of the appId or any other property, it is put into MDC the same way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean user-facing APIs. Eventually, we have to call MDC.put as this is how MDC works.

Maybe it's better to say that Spark can propagate the MDC properties to both driver thread pools and executor tasks. So end-users only need to call MDC.put.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the driver is true. for the Executor as it is in another process it should be called explicitly by the user.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right. there is no way to make MDC put something in another process.

@cloud-fan
Copy link
Contributor

Let's think about the use cases. There are many users who share one spark application (e.g. job server), and each user can set MDC properties so that he can collect spark logs for his spark jobs only. (correct me if I misunderstand the use case)

If I were an end-user, I'd like to use one API to set MDC properties for both driver and executor logs. I don't see a use case that needs driver and executor have different MDC properties (except that the executor has an extra builtin MDC property: the task id).

I agree that it's better to just inherit the MDC properties. And that should be the story for end-users. Spark should do two things internally:

  1. update the driver side thread pools to inherit the MDC properties.
  2. use local properties to propagate the MDC properties to the executor JVM.

This PR does 1, but 2 is only partially done. We shouldn't ask users to manually set the local properties, but we should do it for them: when submitting a job, read the MDC properties of the current thread and set it to local properties.

Another problem is the MDC property names. Since we just inherit it at driver-side, it's not possible to require the "mdc." prefix. Maybe we should also remove this pre-fix at the executor side. cc @Ngone51

@Ngone51
Copy link
Member

Ngone51 commented Jul 3, 2020

I'm fine to remove the prefix if we want to inherit the MDC properties directly since I agree API consistent is more important. And I think we need to document it clearly that which MDC users should use as Spark actually has org.slf4j.MDC and org.apache.log4j.MDC(though, org.apache.log4j.MDC is implemented by org.slf4j.MDC by default).

Besides, I have two questions:

I just realize that MDC is actually backed by the InheritableThreadLocal. That's why when we set up MDC properties in the main thread, those thread pools could also get the updated properties. I can also imagine how this works for the case mentioned above(job server) when jobs come in sequence. For example, we could clean up the MDC properties after one Job finishes and sets up the new MDC properties for the next job. However, I'm just wondering how it works if jobs come in concurrently. MDC properties could mess up in this way, right?

And it seems this PR also can't work with the loop running runnable, e.g. MessageLoop.receiveLoopRunnable. This kind of runnable can only get the MDC properties at the time it was created. That also means it can only reflect the MDC properties before the initialization of SparkContext. So I'm not sure if it could work for jobs that are created after the SparkContext.

@igreenfield
Copy link
Author

@cloud-fan, can we continue with Adding the Driver inherited MDC passing to Executor using setLocalProperties? I totally agree with @Ngone51 as stated above, although the main reason is the fact we have 2 processes and not only because of receiveLoopRunnable.

@cloud-fan
Copy link
Contributor

can we continue with Adding the Driver inherited MDC passing to Executor using setLocalProperties?

SGTM

@igreenfield
Copy link
Author

@cloud-fan,

looking at the code I thought to add the properties in DAGScheduler

line: 1216-1219

    // Use the scheduling pool, job group, description, etc. from an ActiveJob associated
    // with this Stage
    val properties = jobIdToActiveJob(jobId).properties
    addPySparkConfigsToProperties(stage, properties)

What do you think?

@cloud-fan
Copy link
Contributor

can you add a new commit? It's easier to review on github

@igreenfield
Copy link
Author

@cloud-fan, What should we do if setLocalProperties and inherited MDC has same keys? which will take effect?

@cloud-fan
Copy link
Contributor

Good question. Since we expect end-users to set MDC properties directly, I think MDC properties should take priority over local properties. The local properties is just an internal detail about how we propagate driver side MDC properties to executor side.

I think this should be rare as we have the mdc. prefix. For example, driver side has a MDC property called abc, then we put mdc.abc in the local properties. In the executor side, we strip the mdc. and put abc in the executor MDC.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Oct 17, 2020
@github-actions github-actions bot closed this Oct 18, 2020
@hirolee88
Copy link

Why is it not merged into the master
now I need use MDC.put in sparkListener, but get null with version2.3.1

@igreenfield
Copy link
Author

@l568288g that PR was intend to be in 3.1.X so 2.3 is not relevant in any case.

@hirolee88
Copy link

Is it ok to just modify ThreadUtils? Does it need to be changed to MDCAwareRunnable where Runnable is used?

@hirolee88
Copy link

@l568288g that PR was intend to be in 3.1.X so 2.3 is not relevant in any case.

thank you for your reply
I want to refer this PR improve ThreadUtils to use MDC

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.