New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support new parameter includeHistoricalMetadata for queryTableChange RPC #214
Conversation
Any reason you need to change these ids? They should be deterministic? |
Not often, it's just during streaming development, I wanna change the table properties of one table that's already in test, instead of creating another one. And then broke the test, which makes me wonder the id doesn't "have to" be fixed, we are more testing the e2e workflow and functionality. |
I don't have a strong opinion... ok with what you and Ryan decide. |
I agree with Ryan that ID should be deterministic. If we need a table with different config we should just create a new one for testing. |
I reverted the id changes, just fixing the failed tests. |
Hi all, |
spark/src/test/scala/io/delta/sharing/spark/TestDeltaSharingClient.scala
Outdated
Show resolved
Hide resolved
val connection = new URL(url).openConnection().asInstanceOf[HttpsURLConnection] | ||
connection.setRequestProperty("Authorization", s"Bearer ${TestResource.testAuthorizationToken}") | ||
if (isStreamingQuery) { | ||
connection.setRequestProperty("User-Agent", "SparkStructuredStreaming") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this for? Why does the server need to know about the purpose of the client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the server to have different behavior based on the client(i.e., return metadata for queryTableChanges), in order to not break old client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chakankardb Had a discussion with Ryan, it's not a good idea to leverage user-agent for different server behavior on RPC request, I'll add a parameter such as 'returnMetadata' for queryTableChanges, and update both Databricks and OSS server/client.
user-agent can still be used for usage tracking, and gate DFF (as it will be removed eventually).
I'll update this PR and send it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zsxwing Can you provide the reasoning here (for documentation)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
User-Agent
is usually informational and it's weird to use it as a parameter to request different results. In addition, SparkStructuredStreaming
is a special system, but our network protocol should be designed for all systems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another point Ryan mentioned is that for column mapping, we may also need to pass metadata to client to render dataframe correctly, which is not a streaming only use case.
The PR is updated with what we discussed, please review again, thanks! |
@Param("share") share: String, | ||
@Param("schema") schema: String, | ||
@Param("table") table: String, | ||
@Param("startingVersion") @Nullable startingVersion: String, | ||
@Param("endingVersion") @Nullable endingVersion: String, | ||
@Param("startingTimestamp") @Nullable startingTimestamp: String, | ||
@Param("endingTimestamp") @Nullable endingTimestamp: String): HttpResponse = processRequest { | ||
@Param("endingTimestamp") @Nullable endingTimestamp: String, | ||
@Param("returnMetadata") @Nullable returnMetadata: String): HttpResponse = processRequest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we call this returnMetadataChanges since queryTableChanges already returns the latest metadata ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know.
I'm not fully happy with returnMetadata either. But returnMetadataChanges sounds like the changes of metadata, which we are returning is just the metadata it self. They are the additional metadata seen in the deltaLog.
returnAdditionalMetadata? not as good as returnMetadata even..
@@ -426,7 +442,7 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { | |||
integrationTest("getTableVersion - get exceptions") { | |||
// timestamp can be any string here, it's resolved in DeltaSharedTableLoader | |||
assertHttpError( | |||
url = requestPath("/shares/share2/schemas/default/tables/table2?startingTimestamp=abc"), | |||
url = requestPath("/shares/share2/schemas/default/tables/table2/version?startingTimestamp=abc"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: why is version needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to leave the endpoint without /version
for getTable. And this is one of the missing tests not updated in that PR.
Now getTable is not needed, I think it's a good idea to have /version
suffix for getTableVersion.
// to recognize the request for streaming, and take corresponding actions. | ||
private def getUserAgent(): String = { | ||
DeltaSharingRestClient.USER_AGENT + (if (forStreaming) { | ||
s" ${DeltaSharingRestClient.SPARK_STRUCTURED_STREAMING}/$STREAMING_VERSION" | ||
val sparkAgent = if (forStreaming) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this still needed? Is it to track the streaming requests on the provider side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. 1. tracking in the usage log 2. gate traffic during private preview.
A couple changes: