Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-42721][CONNECT] RPC logging interceptor #40342

Closed
wants to merge 8 commits into from

Conversation

rangadi
Copy link
Contributor

@rangadi rangadi commented Mar 8, 2023

What changes were proposed in this pull request?

This adds an gRPC interceptor in spark-connect server. It logs all the incoming RPC requests and responses.

  • How to enable: Set interceptor config. e.g.

    ./sbin/start-connect-server.sh --conf spark.connect.grpc.interceptor.classes=org.apache.spark.sql.connect.service.LoggingInterceptor  --jars connector/connect/server/target/spark-connect_*-SNAPSHOT.jar 
    
  • Sample output:

     23/03/08 10:54:37 INFO LoggingInterceptor: Received RPC Request spark.connect.SparkConnectService/ExecutePlan (id 1868663481):   
     {
       "client_id": "6844bc44-4411-4481-8109-a10e3a836f97",
       "user_context": {
         "user_id": "raghu"
       },
       "plan": {
         "root": {
           "common": {
             "plan_id": "37"
           },
           "show_string": {
             "input": {
               "common": {
                 "plan_id": "36"
               },
               "read": {
                 "data_source": {
                   "format": "csv",
                   "schema": "",
                   "paths": ["file:///tmp/x-in"]
                 }
               }
             },
             "num_rows": 20,
             "truncate": 20
           }
         }
       },
       "client_type": "_SPARK_CONNECT_PYTHON"
     }
    

Why are the changes needed?

This is useful in development. It might be useful to debug some problems in production as well.

Does this PR introduce any user-facing change?

no

How was this patch tested?

  • Manually in development
  • Unit test

@LuciferYang
Copy link
Contributor

Need run ./build/mvn -Pscala-2.12 scalafmt:format -Dscalafmt.skip=false -Dscalafmt.validateOnly=false -Dscalafmt.changedOnly=false -pl connector/connect/common -pl connector/connect/server -pl connector/connect/client/jvm to format the code in connect modules

@rangadi
Copy link
Contributor Author

rangadi commented Mar 9, 2023

@LuciferYang thanks. Fixed.

@@ -155,6 +155,12 @@
<version>${protobuf.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this pr, the server module depended on protobuf-java-util:3.19.2 . Should protobuf-java-util and protobuf-java always use the same version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that should be fine. I didn't realize it already included protobuf-java-util. Removed this change.
Could you point me to where this dependency comes from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the dependency and mvn build fails. But SBT is ok. Not sure where SBT gets the dependency from.
What shall we do?

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can check it through

build/mvn dependency:tree -pl connector/connect/server

protobuf-java-util is the transitive dependency of grpc-services, but it is runtime scope.

[INFO] +- io.grpc:grpc-services:jar:1.47.0:compile
[INFO] |  \- com.google.protobuf:protobuf-java-util:jar:3.19.2:runtime

So I think we should explicit add this dependency for compilation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done. Updated the PR.

headers: Metadata,
next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {

val id = Random.nextInt(Int.MaxValue) // Assign a random id for this RPC.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change to use UUID? I think it has less conflict probability than Random.nextInt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. This is just for debugging. UUID looks very long and makes the logs harder to read. I intentionally even avoided negative numbers :).

@github-actions github-actions bot removed the BUILD label Mar 9, 2023
@LuciferYang
Copy link
Contributor

fine to me, cc @zhenlineo @amaliujia @hvanhovell FYI

@github-actions github-actions bot added the BUILD label Mar 10, 2023
Copy link
Contributor

@zhenlineo zhenlineo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

hvanhovell pushed a commit that referenced this pull request Mar 10, 2023
### What changes were proposed in this pull request?

This adds an gRPC interceptor in spark-connect server. It logs all the incoming RPC requests and responses.

 - How to enable: Set interceptor config. e.g.

       ./sbin/start-connect-server.sh --conf spark.connect.grpc.interceptor.classes=org.apache.spark.sql.connect.service.LoggingInterceptor  --jars connector/connect/server/target/spark-connect_*-SNAPSHOT.jar

 - Sample output:

        23/03/08 10:54:37 INFO LoggingInterceptor: Received RPC Request spark.connect.SparkConnectService/ExecutePlan (id 1868663481):
        {
          "client_id": "6844bc44-4411-4481-8109-a10e3a836f97",
          "user_context": {
            "user_id": "raghu"
          },
          "plan": {
            "root": {
              "common": {
                "plan_id": "37"
              },
              "show_string": {
                "input": {
                  "common": {
                    "plan_id": "36"
                  },
                  "read": {
                    "data_source": {
                      "format": "csv",
                      "schema": "",
                      "paths": ["file:///tmp/x-in"]
                    }
                  }
                },
                "num_rows": 20,
                "truncate": 20
              }
            }
          },
          "client_type": "_SPARK_CONNECT_PYTHON"
        }

### Why are the changes needed?
This is useful in  development. It might be useful to debug some problems in production as well.

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
 - Manually in development
 - Unit test

Closes #40342 from rangadi/logging-interceptor.

Authored-by: Raghu Angadi <raghu.angadi@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
(cherry picked from commit 19cb8d7)
Signed-off-by: Herman van Hovell <herman@databricks.com>
@dongjoon-hyun
Copy link
Member

Hi, @rangadi and @hvanhovell . There is a scalafmt error. Here is a follow-up to recover CI

@rangadi
Copy link
Contributor Author

rangadi commented Mar 11, 2023

@dongjoon-hyun thank you! I should have checked. missed it.

snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
### What changes were proposed in this pull request?

This adds an gRPC interceptor in spark-connect server. It logs all the incoming RPC requests and responses.

 - How to enable: Set interceptor config. e.g.

       ./sbin/start-connect-server.sh --conf spark.connect.grpc.interceptor.classes=org.apache.spark.sql.connect.service.LoggingInterceptor  --jars connector/connect/server/target/spark-connect_*-SNAPSHOT.jar

 - Sample output:

        23/03/08 10:54:37 INFO LoggingInterceptor: Received RPC Request spark.connect.SparkConnectService/ExecutePlan (id 1868663481):
        {
          "client_id": "6844bc44-4411-4481-8109-a10e3a836f97",
          "user_context": {
            "user_id": "raghu"
          },
          "plan": {
            "root": {
              "common": {
                "plan_id": "37"
              },
              "show_string": {
                "input": {
                  "common": {
                    "plan_id": "36"
                  },
                  "read": {
                    "data_source": {
                      "format": "csv",
                      "schema": "",
                      "paths": ["file:///tmp/x-in"]
                    }
                  }
                },
                "num_rows": 20,
                "truncate": 20
              }
            }
          },
          "client_type": "_SPARK_CONNECT_PYTHON"
        }

### Why are the changes needed?
This is useful in  development. It might be useful to debug some problems in production as well.

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
 - Manually in development
 - Unit test

Closes apache#40342 from rangadi/logging-interceptor.

Authored-by: Raghu Angadi <raghu.angadi@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
(cherry picked from commit 19cb8d7)
Signed-off-by: Herman van Hovell <herman@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants