Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46424][PYTHON][SQL] Support Python metrics in Python Data Source #44375

Closed
wants to merge 2 commits into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Dec 15, 2023

What changes were proposed in this pull request?

This PR proposes to support Python metrics in Python Data Source so the metrics are reported same as other Python execution and API.

Why are the changes needed?

Same metics (#33559) should be shown in Python Data Source reading. This is last missing part compared to other Python execution and API.

Does this PR introduce any user-facing change?

Python Data Source has not been released yet, so no end-user facing change.
It shows some new metrics in UI.

Example:

from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition

class TestDataSourceReader(DataSourceReader):
    def __init__(self, options):
        self.options = options
    def partitions(self):
        return [InputPartition(i) for i in range(3)]
    def read(self, partition):
        yield partition.value, str(partition.value)

class TestDataSource(DataSource):
    @classmethod
    def name(cls):
        return "test"
    def schema(self):
        return "x INT, y STRING"
    def reader(self, schema) -> "DataSourceReader":
        return TestDataSourceReader(self.options)


spark.dataSource.register(TestDataSource)
sql("CREATE TABLE tblA USING test")
sql("SELECT * from tblA").show()
Screenshot 2023-12-15 at 5 54 55 PM

This is same as other Python nodes, UDFs, etc.

How was this patch tested?

Unittests were added, and manually tested via UI.

Was this patch authored or co-authored using generative AI tooling?

No.

jobArtifactUUID: Option[String])
extends BasePythonRunner[InType, OutType](funcs, evalType, argOffsets, jobArtifactUUID)
with PythonArrowInput[InType]
with PythonArrowOutput[OutType] {

override val pythonMetrics: Option[Map[String, SQLMetric]] = Some(pyMetrics)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All those changes are actually some revert of #44305 because we now support metrics, and no need to make it optional anymore

@HyukjinKwon HyukjinKwon marked this pull request as ready for review December 16, 2023 01:55
@HyukjinKwon
Copy link
Member Author

cc @cloud-fan and @allisonwang-db

cc @viirya too who actually added this IIRC :-).

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

@HyukjinKwon
Copy link
Member Author

Merged to master

Comment on lines +131 to +137
// Dummy SQLMetrics. The result is manually reported via DSv2 interface
// via passing the value to `CustomTaskMetric`. Note that `pythonOtherMetricsDesc`
// is not used when it is reported. It is to reuse existing Python runner.
// See also `UserDefinedPythonDataSource.createPythonMetrics`.
private[this] val metrics: Map[String, SQLMetric] = {
PythonSQLMetrics.pythonSizeMetricsDesc.keys
.map(_ -> new SQLMetric("size", -1)).toMap ++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure I understand this part. Are these size metrics automatically updated by the DSv2 framework?

Also, is it possible to support user-defined Python metrics in the future?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is a little tricky. In the DS v2 framework, the reader (runs at the executor side) needs to update and report the current value of its metrics. To reuse existing code, here we use SQLMetrics and its value will be updated within createMapInBatchEvaluatorFactory (which calls MapInBatchEvaluatorFactory). Then we take the value from SQLMetric and report it via the DS v2 framework in currentMetricsValues

@@ -179,11 +240,11 @@ case class UserDefinedPythonDataSource(dataSourceCls: PythonFunction) {
/**
* (Executor-side) Create an iterator that reads the input partitions.
*/
def createPartitionReadIteratorInPython(
partition: PythonInputPartition,
def createMapInBatchEvaluatorFactory(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we plan to reuse this method further? If not we can make it return Iterator[InternalRow] to simplify the caller code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

class PythonCustomMetric extends CustomMetric {
private var initName: String = _
private var initDescription: String = _
def initialize(n: String, d: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the caller side always call initialize right after instantiating PythonCustomMetric, shall we just add constructor with parameters?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't because it requires to have 0-argument constructor:

logWarning(s"Unable to load custom metric object for class `$className`. " +
"Please make sure that the custom metric class is in the classpath and " +
"it has 0-arg constructor.", e)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, actually I think I can just provide multiple constructors.

class PythonCustomTaskMetric extends CustomTaskMetric {
private var initName: String = _
private var initValue: Long = -1L
def initialize(n: String, v: Long): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


class PythonCustomTaskMetric extends CustomTaskMetric {
private var initName: String = _
private var initValue: Long = -1L
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's currentValue? We always create new instances of PythonCustomTaskMetric when reporting the DS v2 metrics. We can optimize this part though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I can rename it. Actually we should just say metricValue to make it less confusing.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Dec 19, 2023

Made a PR to clean up (#44406).

HyukjinKwon added a commit to HyukjinKwon/spark that referenced this pull request Dec 19, 2023
…tric implementation

### What changes were proposed in this pull request?

This PR is a followup of apache#44375 that refactors and clean up the codes (pointed by review comments).

### Why are the changes needed?

For better readability.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing test cases should cover. I also manually tested as described in apache#44375

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#44406 from HyukjinKwon/SPARK-46424-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@HyukjinKwon HyukjinKwon deleted the SPARK-46424 branch January 15, 2024 00:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants