Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47585][SQL] SQL core: Migrate logInfo with variables to structured logging framework #46264

Closed
wants to merge 9 commits into from

Conversation

panbingkun
Copy link
Contributor

@panbingkun panbingkun commented Apr 28, 2024

What changes were proposed in this pull request?

The pr aims to migrate logInfo in module SQL core with variables to structured logging framework.

Why are the changes needed?

To enhance Apache Spark's logging system by implementing structured logging.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  • Pass GA.

Was this patch authored or co-authored using generative AI tooling?

No.

@@ -39,7 +39,7 @@ class FilePartitionReader[T](
if (currentReader == null) {
if (files.hasNext) {
val file = files.next()
logInfo(s"Reading file $file")
logInfo(log"Reading file ${MDC(PARTITIONED_FILE, file)}")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class of file is PartitionedFile, So LogKeys.PATH is not used

override def toString: String = {
s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues"
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CURRENT_FILE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -233,7 +233,7 @@ class FileScanRDD(
if (files.hasNext) {
currentFile = files.next()
updateMetadataRow()
logInfo(s"Reading File $currentFile")
logInfo(log"Reading File ${MDC(PARTITIONED_FILE, currentFile)}")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class of currentFile is PartitionedFile.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CURRENT_FILE seems better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -494,7 +495,7 @@ object DataSourceStrategy
val partitionSet = AttributeSet(partitionColumns)
val predicates = ExpressionSet(normalizedFilters
.flatMap(extractPredicatesWithinOutputSet(_, partitionSet)))
logInfo(s"Pruning directories with: ${predicates.mkString(",")}")
logInfo(log"Pruning directories with: ${MDC(PREDICATES, predicates.mkString(","))}")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if EXPRS is more general.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PREDICATES looks fine

@@ -468,7 +467,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
} else if (filteredRules.nonEmpty) {
Some(Batch(batch.name, batch.strategy, filteredRules: _*))
} else {
logInfo(log"Optimization batch '${MDC(RULE_BATCH_NAME, batch.name)}' " +
logInfo(log"Optimization batch '${MDC(LogKeys.RULE_BATCH_NAME, batch.name)}' " +
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unified as LogKeys.BATCH_NAME, remove LogKeys.RULE_BATCH_NAME

if (Utils.isTesting) {
throw SparkException.internalError(errMsg)
throw SparkException.internalError(errMsg.message)
} else {
logInfo(errMsg)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For it

compute,
output,
boot + init + broadcast + input + compute + output))
logInfo(log"Times: boot = ${MDC(LogKeys.BOOT, format(boot))} s, " +
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether to prefix the following keys as R_

case object RMSE extends LogKey
case object ROCKS_DB_LOG_LEVEL extends LogKey
case object ROCKS_DB_LOG_MESSAGE extends LogKey
case object RPC_ENDPOINT_REF extends LogKey
case object RULE_BATCH_NAME extends LogKey
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unified as LogKeys.BATCH_NAME, remove LogKeys.RULE_BATCH_NAME

case object PARTITIONED_FILE_READER extends LogKey
case object PARTITIONS_SIZE extends LogKey
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unified as LogKeys.COUNT, remove LogKeys.PARTITIONS_SIZE

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PARTITIONS_SIZE seems useful. Shall we keep it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PARTITIONS_SIZE -> NUM_PARTITIONS

It has been changed to NUM_PARTITIONS as suggested below
At the same time, I unified LogKeys.NUM_PARTITION to LogKeys.NUM_PARTITIONS, and remove LogKeys.NUM_PARTITION

@@ -191,20 +203,23 @@ object LogKeys {
case object HIVE_OPERATION_TYPE extends LogKey
case object HOST extends LogKey
case object HOST_PORT extends LogKey
case object IDENTIFIER extends LogKey
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unified as LogKeys.TABLE_NAME, remove LogKeys.IDENTIFIER

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, only two places use this key, and they all mean the name of the table.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for unifying them!

@@ -111,7 +117,7 @@ object LogKeys {
case object DATA_FILE_NUM extends LogKey
case object DATA_SOURCE extends LogKey
case object DATA_SOURCES extends LogKey
case object DATA_SOURCE_PROVIDER extends LogKey
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unified as LogKeys.DATA_SOURCE, remove LogKeys.DATA_SOURCE_PROVIDER

@panbingkun
Copy link
Contributor Author

cc @gengliangwang

@panbingkun panbingkun marked this pull request as ready for review April 29, 2024 03:12
@@ -717,14 +716,15 @@ case class RepairTableCommand(
evalPool.shutdown()
}
val total = partitionSpecsAndLocs.length
logInfo(s"Found $total partitions in $root")
logInfo(log"Found ${MDC(LogKeys.TOTAL, total)} partitions in ${MDC(LogKeys.PATH, root)}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NUM_PARTITIONS

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


val partitionStats = if (spark.sessionState.conf.gatherFastStats) {
gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, threshold)
} else {
Map.empty[Path, PartitionStatistics]
}
logInfo(s"Finished to gather the fast stats for all $total partitions.")
logInfo(log"Finished to gather the fast stats for all ${MDC(LogKeys.TOTAL, total)} " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -99,7 +99,7 @@ object FilePartition extends Logging {
val desiredSplitBytes =
(totalSizeInBytes / BigDecimal(maxPartNum.get)).setScale(0, RoundingMode.UP).longValue
val desiredPartitions = getFilePartitions(partitionedFiles, desiredSplitBytes, openCostBytes)
logWarning(log"The number of partitions is ${MDC(PARTITIONS_SIZE, partitions.size)}, " +
logWarning(log"The number of partitions is ${MDC(COUNT, partitions.size)}, " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

logInfo {
s"Pruned ${numBuckets - numBucketsSelected} out of $numBuckets buckets."
}
logInfo(log"Pruned ${MDC(COUNT, numBuckets - numBucketsSelected)} " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NUM_PRUNED

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -164,8 +164,10 @@ private[sql] object JDBCRelation extends Logging {
i = i + 1
}
val partitions = ans.toArray
logInfo(s"Number of partitions: $numPartitions, WHERE clauses of these partitions: " +
partitions.map(_.asInstanceOf[JDBCPartition].whereClause).mkString(", "))
logInfo(log"Number of partitions: ${MDC(COUNT, numPartitions)}, WHERE clauses of " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -421,7 +424,8 @@ case class EnsureRequirements(
.mergePartitions(leftSpec.partitioning, rightSpec.partitioning, partitionExprs)
.map(v => (v, 1))

logInfo(s"After merging, there are ${mergedPartValues.size} partitions")
logInfo(
log"After merging, there are ${MDC(LogKeys.COUNT, mergedPartValues.size)} partitions")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@gengliangwang
Copy link
Member

LGTM except for a few minor comments on the names.

@github-actions github-actions bot added the ML label Apr 30, 2024
@panbingkun
Copy link
Contributor Author

LGTM except for a few minor comments on the names.

All done. Thanks!

Copy link
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the works!

JacobZheng0927 pushed a commit to JacobZheng0927/spark that referenced this pull request May 11, 2024
…ured logging framework

### What changes were proposed in this pull request?
The pr aims to migrate `logInfo` in module `SQL core` with variables to `structured logging framework`.

### Why are the changes needed?
To enhance Apache Spark's logging system by implementing structured logging.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
- Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#46264 from panbingkun/SPARK-47585.

Authored-by: panbingkun <panbingkun@baidu.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants