Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.{SparkContext, SparkEnv}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.util.UnsafeRowUtils
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{ThreadUtils, Utils}
Expand Down Expand Up @@ -182,16 +183,36 @@ object StateStoreMetrics {

/**
* Name and description of custom implementation-specific metrics that a
* state store may wish to expose.
* state store may wish to expose. Also provides [[SQLMetric]] instance to
* show the metric in UI and accumulate it at the query level.
*/
trait StateStoreCustomMetric {
def name: String
def desc: String
def withNewDesc(desc: String): StateStoreCustomMetric
def createSQLMetric(sparkContext: SparkContext): SQLMetric
}

case class StateStoreCustomSumMetric(name: String, desc: String) extends StateStoreCustomMetric
case class StateStoreCustomSizeMetric(name: String, desc: String) extends StateStoreCustomMetric
case class StateStoreCustomTimingMetric(name: String, desc: String) extends StateStoreCustomMetric
case class StateStoreCustomSumMetric(name: String, desc: String) extends StateStoreCustomMetric {
override def withNewDesc(newDesc: String): StateStoreCustomSumMetric = copy(desc = desc)

override def createSQLMetric(sparkContext: SparkContext): SQLMetric =
SQLMetrics.createMetric(sparkContext, desc)
}

case class StateStoreCustomSizeMetric(name: String, desc: String) extends StateStoreCustomMetric {
override def withNewDesc(desc: String): StateStoreCustomSizeMetric = copy(desc = desc)

override def createSQLMetric(sparkContext: SparkContext): SQLMetric =
SQLMetrics.createSizeMetric(sparkContext, desc)
}

case class StateStoreCustomTimingMetric(name: String, desc: String) extends StateStoreCustomMetric {
override def withNewDesc(desc: String): StateStoreCustomTimingMetric = copy(desc = desc)

override def createSQLMetric(sparkContext: SparkContext): SQLMetric =
SQLMetrics.createTimingMetric(sparkContext, desc)
}

/**
* An exception thrown when an invalid UnsafeRow is detected in state store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,15 +319,7 @@ class SymmetricHashJoinStateManager(
keyWithIndexToValueMetrics.numKeys, // represent each buffered row only once
keyToNumValuesMetrics.memoryUsedBytes + keyWithIndexToValueMetrics.memoryUsedBytes,
keyWithIndexToValueMetrics.customMetrics.map {
case (s @ StateStoreCustomSumMetric(_, desc), value) =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't quite get why it worths the effort, you have add code either here or in the case class. no difference to me.

or maybe you can save the effort by provide some default implementation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main point is to avoid making changes here (in SymmetricHashJoinManager) whenever you add a new case class. You just add all the code required in the case class when a new one is added. It is easy to miss adding the code here and with this patch we can avoid exposing the subclasses of StateStoreCustomMetric here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
this is easier to maintain. exhaustive enumeration of subclasses is brittle to addition of subclasses.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be even better if with some scala magic the default implementation could be provided. but its not obvious to me how, because this is calling the copy constructor which is not available in any trait.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Jun 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are also considering extension point from outside (like 3rd party state store provider) to add a new implementation of StateStoreCustomMetric, we should definitely not suppose the new implementation to be a case class having desc as field. (Even they have it, we can't leverage it unless doing reflection.) I think this is the right way to do.

s.copy(desc = newDesc(desc)) -> value
case (s @ StateStoreCustomSizeMetric(_, desc), value) =>
s.copy(desc = newDesc(desc)) -> value
case (s @ StateStoreCustomTimingMetric(_, desc), value) =>
s.copy(desc = newDesc(desc)) -> value
case (s, _) =>
throw new IllegalArgumentException(
s"Unknown state store custom metric is found at metrics: $s")
case (metric, value) => (metric.withNewDesc(desc = newDesc(metric.desc)), value)
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
private def stateStoreCustomMetrics: Map[String, SQLMetric] = {
val provider = StateStoreProvider.create(sqlContext.conf.stateStoreProviderClass)
provider.supportedCustomMetrics.map {
case StateStoreCustomSumMetric(name, desc) =>
name -> SQLMetrics.createMetric(sparkContext, desc)
case StateStoreCustomSizeMetric(name, desc) =>
name -> SQLMetrics.createSizeMetric(sparkContext, desc)
case StateStoreCustomTimingMetric(name, desc) =>
name -> SQLMetrics.createTimingMetric(sparkContext, desc)
metric => (metric.name, metric.createSQLMetric(sparkContext))
}.toMap
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,20 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
assert(err.getMessage.contains("Cannot put a null value"))
}

test("SPARK-35763: StateStoreCustomMetric withNewDesc and createSQLMetric") {
val metric = StateStoreCustomSizeMetric(name = "m1", desc = "desc1")
val metricNew = metric.withNewDesc("new desc")
assert(metricNew.desc === "new desc", "incorrect description in copied instance")
assert(metricNew.name === "m1", "incorrect name in copied instance")

val conf = new SparkConf().setMaster("local").setAppName("SPARK-35763").set(RPC_NUM_RETRIES, 1)
withSpark(new SparkContext(conf)) { sc =>
val sqlMetric = metric.createSQLMetric(sc)
assert(sqlMetric != null)
assert(sqlMetric.name === Some("desc1"))
}
}

/** Return a new provider with a random id */
def newStoreProvider(): ProviderClass

Expand Down