From a82c1900cd82507c5b5423c2930a701ef2011c37 Mon Sep 17 00:00:00 2001 From: Lin Zhou Date: Sat, 15 Oct 2022 23:10:47 -0700 Subject: [PATCH 1/8] Support startingTimestamp for getTableVersion --- .../sharing/server/DeltaSharingService.scala | 23 ++++++- .../internal/DeltaSharedTableLoader.scala | 18 +++++ .../server/DeltaSharingServiceSuite.scala | 67 +++++++++++++++++-- .../sharing/spark/DeltaSharingClient.scala | 13 +++- .../spark/DeltaSharingRestClientSuite.scala | 17 ++++- .../spark/TestDeltaSharingClient.scala | 2 +- 6 files changed, 128 insertions(+), 12 deletions(-) diff --git a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala index 05a0e6fd0..cac0fa928 100644 --- a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala +++ b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala @@ -243,9 +243,23 @@ class DeltaSharingService(serverConfig: ServerConfig) { def getTableVersion( @Param("share") share: String, @Param("schema") schema: String, - @Param("table") table: String): HttpResponse = processRequest { + @Param("table") table: String, + @Param("startingTimestamp") @Nullable startingTimestamp: String + ): HttpResponse = processRequest { val tableConfig = sharedTableManager.getTable(share, schema, table) - val version = deltaSharedTableLoader.loadTable(tableConfig).tableVersion + if (startingTimestamp != null && !tableConfig.cdfEnabled) { + throw new DeltaSharingIllegalArgumentException("Reading table by version or timestamp is" + + " not supported because change data feed is not enabled on table: " + + s"$share.$schema.$table") + } + val version = deltaSharedTableLoader.loadTable(tableConfig).getTableVersion( + Option(startingTimestamp) + ) + if (startingTimestamp != null && version < tableConfig.startVersion) { + throw new DeltaSharingIllegalArgumentException( + s"You can only query table data since version ${tableConfig.startVersion}." + ) + } val headers = createHeadersBuilderForTableVersion(version).build() HttpResponse.of(headers) } @@ -310,6 +324,11 @@ class DeltaSharingService(serverConfig: ServerConfig) { request.version, request.timestamp, request.startingVersion) + if (version < tableConfig.startVersion) { + throw new DeltaSharingIllegalArgumentException( + s"You can only query table data since version ${tableConfig.startVersion}." + ) + } logger.info(s"Took ${System.currentTimeMillis - start} ms to load the table " + s"and sign ${actions.length - 2} urls for table $share/$schema/$table") streamingOutput(Some(version), actions) diff --git a/server/src/main/scala/io/delta/standalone/internal/DeltaSharedTableLoader.scala b/server/src/main/scala/io/delta/standalone/internal/DeltaSharedTableLoader.scala index 8dafe69e1..1a747fbc9 100644 --- a/server/src/main/scala/io/delta/standalone/internal/DeltaSharedTableLoader.scala +++ b/server/src/main/scala/io/delta/standalone/internal/DeltaSharedTableLoader.scala @@ -150,6 +150,24 @@ class DeltaSharedTable( } } + def getTableVersion(startingTimestamp: Option[String]): Long = withClassLoader { + if (startingTimestamp.isEmpty) { + tableVersion + } else { + val ts = DeltaSharingHistoryManager.getTimestamp("startingTimestamp", startingTimestamp.get) + // get a version at or after the provided timestamp, if the timestamp is early than version 0, + // return 0. + try { + deltaLog.getVersionAtOrAfterTimestamp(ts.getTime()) + } catch { + // Convert to DeltaSharingIllegalArgumentException to return 4xx instead of 5xx error code + // Only convert known exceptions around timestamp too late or too early + case e: IllegalArgumentException => + throw new DeltaSharingIllegalArgumentException(e.getMessage) + } + } + } + /** Return the current table version */ def tableVersion: Long = withClassLoader { val snapshot = deltaLog.snapshot diff --git a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala index bd6990ccc..ce7e38401 100644 --- a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala +++ b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala @@ -307,18 +307,64 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { integrationTest("table1 - head - /shares/{share}/schemas/{schema}/tables/{table}") { - val url = requestPath("/shares/share1/schemas/default/tables/table1") - val connection = new URL(url).openConnection().asInstanceOf[HttpsURLConnection] + // getTableVersion succeeds without parameters + var url = requestPath("/shares/share1/schemas/default/tables/table1") + var connection = new URL(url).openConnection().asInstanceOf[HttpsURLConnection] connection.setRequestMethod("HEAD") connection.setRequestProperty("Authorization", s"Bearer ${TestResource.testAuthorizationToken}") - val input = connection.getInputStream() + var input = connection.getInputStream() try { IOUtils.toString(input) } finally { input.close() } - val deltaTableVersion = connection.getHeaderField("Delta-Table-Version") + var deltaTableVersion = connection.getHeaderField("Delta-Table-Version") assert(deltaTableVersion == "2") + + // getTableVersion succeeds with parameters + url = requestPath("/shares/share1/schemas/default/tables/cdf_table_cdf_enabled?startingTimestamp=2000-01-01%2000:00:00") + connection = new URL(url).openConnection().asInstanceOf[HttpsURLConnection] + connection.setRequestMethod("HEAD") + connection.setRequestProperty("Authorization", s"Bearer ${TestResource.testAuthorizationToken}") + input = connection.getInputStream() + try { + IOUtils.toString(input) + } finally { + input.close() + } + deltaTableVersion = connection.getHeaderField("Delta-Table-Version") + assert(deltaTableVersion == "0") + } + + integrationTest("getTableVersion - exceptions") { + // timestamp can be any string here, it's resolved in DeltaSharedTableLoader + assertHttpError( + url = requestPath("/shares/share2/schemas/default/tables/table2?startingTimestamp=abc"), + method = "HEAD", + data = None, + expectedErrorCode = 400, + expectedErrorMessage = "Reading table by version or timestamp is not supported because " + ) + + // invalid startingTimestamp format + assertHttpError( + url = requestPath( + "/shares/share1/schemas/default/tables/cdf_table_cdf_enabled?startingTimestamp=abc" + ), + method = "HEAD", + data = None, + expectedErrorCode = 400, + expectedErrorMessage = "Invalid startingTimestamp" + ) + + // timestamp after the latest version + assertHttpError( + url = requestPath("/shares/share1/schemas/default/tables/cdf_table_cdf_enabled?startingTimestamp=9999-01-01%2000:00:00"), + method = "HEAD", + data = None, + expectedErrorCode = 400, + expectedErrorMessage = "The provided timestamp (9999-01-01 00:00:00.0) is after the latest version available" + ) } integrationTest("table1 - non partitioned - /shares/{share}/schemas/{schema}/tables/{table}/metadata") { @@ -686,6 +732,17 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { expectedErrorCode = 400, expectedErrorMessage = "The provided timestamp (9999-01-01 00:00:00.0) is after the latest version available" ) + + // can only query table data since version 1 + // 1651614979 PST: 2022-05-03T14:56:19.000+0000, version 1 is 1 second later + val tsStr = new Timestamp(1651614979000L).toString + assertHttpError( + url = requestPath("/shares/share1/schemas/default/tables/cdf_table_with_partition/query"), + method = "POST", + data = Some(s"""{"timestamp": "$tsStr"}"""), + expectedErrorCode = 400, + expectedErrorMessage = "You can only query table data since version 1" + ) } integrationTest("cdf_table_cdf_enabled - timestamp on version 1 - /shares/{share}/schemas/{schema}/tables/{table}/query") { @@ -1051,7 +1108,7 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { connection.getInputStream() } assert(e.getMessage.contains(s"Server returned HTTP response code: $expectedErrorCode")) - assert(IOUtils.toString(connection.getErrorStream()).contains(expectedErrorMessage)) + assert(method == "HEAD" || IOUtils.toString(connection.getErrorStream()).contains(expectedErrorMessage)) } integrationTest("valid request json but incorrect field type") { diff --git a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala index b90ba3902..bf56e3416 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala @@ -44,7 +44,7 @@ import io.delta.sharing.spark.util.{JsonUtils, RetryUtils, UnexpectedHttpStatus} private[sharing] trait DeltaSharingClient { def listAllTables(): Seq[Table] - def getTableVersion(table: Table): Long + def getTableVersion(table: Table, startingTimestamp: Option[String] = None): Long def getMetadata(table: Table): DeltaTableMetadata @@ -161,12 +161,19 @@ private[spark] class DeltaSharingRestClient( tables } - override def getTableVersion(table: Table): Long = { + override def getTableVersion(table: Table, startingTimestamp: Option[String] = None): Long = { val encodedShareName = URLEncoder.encode(table.share, "UTF-8") val encodedSchemaName = URLEncoder.encode(table.schema, "UTF-8") val encodedTableName = URLEncoder.encode(table.name, "UTF-8") + + val encodedParam = if (startingTimestamp.isDefined) { + s"?startingTimestamp=${URLEncoder.encode(startingTimestamp.get)}" + } else { + "" + } val target = - getTargetUrl(s"/shares/$encodedShareName/schemas/$encodedSchemaName/tables/$encodedTableName") + getTargetUrl(s"/shares/$encodedShareName/schemas/$encodedSchemaName/tables/" + + s"$encodedTableName$encodedParam") val (version, _) = getResponse(new HttpHead(target)) version.getOrElse { throw new IllegalStateException("Cannot find Delta-Table-Version in the header") diff --git a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala index 96c7ce409..8f3b78685 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala @@ -62,12 +62,27 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { } } - integrationTest("getTableVersion") { + integrationTest("getTableVersion - success") { val client = new DeltaSharingRestClient(testProfileProvider, sslTrustAll = true) try { assert(client.getTableVersion(Table(name = "table2", schema = "default", share = "share2")) == 2) assert(client.getTableVersion(Table(name = "table1", schema = "default", share = "share1")) == 2) assert(client.getTableVersion(Table(name = "table3", schema = "default", share = "share1")) == 4) + assert(client.getTableVersion(Table(name = "cdf_table_cdf_enabled", schema = "default", share = "share1"), + startingTimestamp=Some("2020-01-01 00:00:00")) == 0) + } finally { + client.close() + } + } + + integrationTest("getTableVersion - exceptions") { + val client = new DeltaSharingRestClient(testProfileProvider, sslTrustAll = true) + try { + val errorMessage = intercept[UnexpectedHttpStatus] { + client.getTableVersion(Table(name = "table1", schema = "default", share = "share1"), + startingTimestamp=Some("2020-01-01 00:00:00")) + }.getMessage + assert(errorMessage.contains("400 Bad Request")) } finally { client.close() } diff --git a/spark/src/test/scala/io/delta/sharing/spark/TestDeltaSharingClient.scala b/spark/src/test/scala/io/delta/sharing/spark/TestDeltaSharingClient.scala index 7e27040cf..546019052 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/TestDeltaSharingClient.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/TestDeltaSharingClient.scala @@ -51,7 +51,7 @@ class TestDeltaSharingClient( DeltaTableMetadata(0, Protocol(0), metadata) } - override def getTableVersion(table: Table): Long = 0 + override def getTableVersion(table: Table, startingTimestamp: Option[String] = None): Long = 0 override def getFiles( table: Table, From b3e1e4bd166389a6318519ab8ff7fbefb40f4c4f Mon Sep 17 00:00:00 2001 From: Lin Zhou Date: Sat, 15 Oct 2022 23:27:00 -0700 Subject: [PATCH 2/8] fix scalastyle --- .../io/delta/sharing/spark/DeltaSharingRestClientSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala index 8f3b78685..a6868d552 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala @@ -69,7 +69,7 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { assert(client.getTableVersion(Table(name = "table1", schema = "default", share = "share1")) == 2) assert(client.getTableVersion(Table(name = "table3", schema = "default", share = "share1")) == 4) assert(client.getTableVersion(Table(name = "cdf_table_cdf_enabled", schema = "default", share = "share1"), - startingTimestamp=Some("2020-01-01 00:00:00")) == 0) + startingTimestamp = Some("2020-01-01 00:00:00")) == 0) } finally { client.close() } @@ -80,7 +80,7 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { try { val errorMessage = intercept[UnexpectedHttpStatus] { client.getTableVersion(Table(name = "table1", schema = "default", share = "share1"), - startingTimestamp=Some("2020-01-01 00:00:00")) + startingTimestamp = Some("2020-01-01 00:00:00")) }.getMessage assert(errorMessage.contains("400 Bad Request")) } finally { From 4455857b1d1c36e952b9dcb940fdd26b5c01b25e Mon Sep 17 00:00:00 2001 From: Lin Zhou Date: Fri, 21 Oct 2022 01:11:54 -0700 Subject: [PATCH 3/8] fix comment --- .../scala/io/delta/sharing/server/DeltaSharingService.scala | 3 ++- .../io/delta/standalone/internal/DeltaSharedTableLoader.scala | 3 +++ .../io/delta/sharing/server/DeltaSharingServiceSuite.scala | 1 + 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala index 1a8b3c1fa..7c1eaf4d1 100644 --- a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala +++ b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala @@ -257,7 +257,8 @@ class DeltaSharingService(serverConfig: ServerConfig) { ) if (startingTimestamp != null && version < tableConfig.startVersion) { throw new DeltaSharingIllegalArgumentException( - s"You can only query table data since version ${tableConfig.startVersion}." + s"You can only query table data since version ${tableConfig.startVersion}." + + s"The provided timestamp($startingTimestamp) corresponds to $version." ) } val headers = createHeadersBuilderForTableVersion(version).build() diff --git a/server/src/main/scala/io/delta/standalone/internal/DeltaSharedTableLoader.scala b/server/src/main/scala/io/delta/standalone/internal/DeltaSharedTableLoader.scala index 57cbe539d..00fd03915 100644 --- a/server/src/main/scala/io/delta/standalone/internal/DeltaSharedTableLoader.scala +++ b/server/src/main/scala/io/delta/standalone/internal/DeltaSharedTableLoader.scala @@ -150,6 +150,9 @@ class DeltaSharedTable( } } + /** Get table version at or after startingTimestamp if it's provided, otherwise return + * the latest table version. + */ def getTableVersion(startingTimestamp: Option[String]): Long = withClassLoader { if (startingTimestamp.isEmpty) { tableVersion diff --git a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala index 84212c442..087e43a2d 100644 --- a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala +++ b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala @@ -1108,6 +1108,7 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { connection.getInputStream() } assert(e.getMessage.contains(s"Server returned HTTP response code: $expectedErrorCode")) + // If the http method is HEAD, error message is not returned from the server. assert(method == "HEAD" || IOUtils.toString(connection.getErrorStream()).contains(expectedErrorMessage)) } From 216a6937f910dd38295d08c24a8eaa34e31b0ccf Mon Sep 17 00:00:00 2001 From: Lin Zhou Date: Mon, 24 Oct 2022 15:02:59 -0700 Subject: [PATCH 4/8] support GET getTableVersion --- .../sharing/server/DeltaSharingService.scala | 1 + .../server/DeltaSharingServiceSuite.scala | 63 ++++++++++++++++++- .../sharing/spark/DeltaSharingClient.scala | 2 +- .../spark/DeltaSharingRestClientSuite.scala | 1 + 4 files changed, 65 insertions(+), 2 deletions(-) diff --git a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala index 7c1eaf4d1..3f57eebe2 100644 --- a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala +++ b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala @@ -240,6 +240,7 @@ class DeltaSharingService(serverConfig: ServerConfig) { } @Head("/shares/{share}/schemas/{schema}/tables/{table}") + @Get("/shares/{share}/schemas/{schema}/tables/{table}") def getTableVersion( @Param("share") share: String, @Param("schema") schema: String, diff --git a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala index 087e43a2d..9e8aa93f9 100644 --- a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala +++ b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala @@ -336,7 +336,37 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { assert(deltaTableVersion == "0") } - integrationTest("getTableVersion - exceptions") { + integrationTest("table1 - get - /shares/{share}/schemas/{schema}/tables/{table}") { + // getTableVersion succeeds without parameters + var url = requestPath("/shares/share1/schemas/default/tables/table1") + var connection = new URL(url).openConnection().asInstanceOf[HttpsURLConnection] + connection.setRequestMethod("GET") + connection.setRequestProperty("Authorization", s"Bearer ${TestResource.testAuthorizationToken}") + var input = connection.getInputStream() + try { + IOUtils.toString(input) + } finally { + input.close() + } + var deltaTableVersion = connection.getHeaderField("Delta-Table-Version") + assert(deltaTableVersion == "2") + + // getTableVersion succeeds with parameters + url = requestPath("/shares/share1/schemas/default/tables/cdf_table_cdf_enabled?startingTimestamp=2000-01-01%2000:00:00") + connection = new URL(url).openConnection().asInstanceOf[HttpsURLConnection] + connection.setRequestMethod("GET") + connection.setRequestProperty("Authorization", s"Bearer ${TestResource.testAuthorizationToken}") + input = connection.getInputStream() + try { + IOUtils.toString(input) + } finally { + input.close() + } + deltaTableVersion = connection.getHeaderField("Delta-Table-Version") + assert(deltaTableVersion == "0") + } + + integrationTest("getTableVersion - head exceptions") { // timestamp can be any string here, it's resolved in DeltaSharedTableLoader assertHttpError( url = requestPath("/shares/share2/schemas/default/tables/table2?startingTimestamp=abc"), @@ -367,6 +397,37 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { ) } + integrationTest("getTableVersion - get exceptions") { + // timestamp can be any string here, it's resolved in DeltaSharedTableLoader + assertHttpError( + url = requestPath("/shares/share2/schemas/default/tables/table2?startingTimestamp=abc"), + method = "GET", + data = None, + expectedErrorCode = 400, + expectedErrorMessage = "Reading table by version or timestamp is not supported because " + ) + + // invalid startingTimestamp format + assertHttpError( + url = requestPath( + "/shares/share1/schemas/default/tables/cdf_table_cdf_enabled?startingTimestamp=abc" + ), + method = "GET", + data = None, + expectedErrorCode = 400, + expectedErrorMessage = "Invalid startingTimestamp" + ) + + // timestamp after the latest version + assertHttpError( + url = requestPath("/shares/share1/schemas/default/tables/cdf_table_cdf_enabled?startingTimestamp=9999-01-01%2000:00:00"), + method = "GET", + data = None, + expectedErrorCode = 400, + expectedErrorMessage = "The provided timestamp (9999-01-01 00:00:00.0) is after the latest version available" + ) + } + integrationTest("table1 - non partitioned - /shares/{share}/schemas/{schema}/tables/{table}/metadata") { val response = readNDJson(requestPath("/shares/share1/schemas/default/tables/table1/metadata"), expectedTableVersion = Some(2)) val Array(protocol, metadata) = response.split("\n") diff --git a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala index c3195efcf..dd239f3fa 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala @@ -174,7 +174,7 @@ private[spark] class DeltaSharingRestClient( val target = getTargetUrl(s"/shares/$encodedShareName/schemas/$encodedSchemaName/tables/" + s"$encodedTableName$encodedParam") - val (version, _) = getResponse(new HttpHead(target)) + val (version, _) = getResponse(new HttpGet(target)) version.getOrElse { throw new IllegalStateException("Cannot find Delta-Table-Version in the header") } diff --git a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala index 248e62964..eaf0e13a1 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala @@ -83,6 +83,7 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { startingTimestamp = Some("2020-01-01 00:00:00")) }.getMessage assert(errorMessage.contains("400 Bad Request")) + assert(errorMessage.contains("Reading table by version or timestamp is not supported")) } finally { client.close() } From d458b4186c9e7cc8ed7e024e44082c5ee1f95cd3 Mon Sep 17 00:00:00 2001 From: Lin Zhou Date: Mon, 24 Oct 2022 17:43:36 -0700 Subject: [PATCH 5/8] update python rest client --- python/delta_sharing/rest_client.py | 84 ++++++++++++------- .../delta_sharing/tests/test_rest_client.py | 29 +++++++ 2 files changed, 85 insertions(+), 28 deletions(-) diff --git a/python/delta_sharing/rest_client.py b/python/delta_sharing/rest_client.py index 21f25c254..5e1fc0e62 100644 --- a/python/delta_sharing/rest_client.py +++ b/python/delta_sharing/rest_client.py @@ -66,6 +66,7 @@ class ListAllTablesResponse: @dataclass(frozen=True) class QueryTableMetadataResponse: + delta_table_version: int protocol: Protocol metadata: Metadata @@ -77,6 +78,7 @@ class QueryTableVersionResponse: @dataclass(frozen=True) class ListFilesInTableResponse: + delta_table_version: int protocol: Protocol metadata: Metadata add_files: Sequence[AddFile] @@ -227,27 +229,42 @@ def list_all_tables( @retry_with_exponential_backoff def query_table_metadata(self, table: Table) -> QueryTableMetadataResponse: with self._get_internal( - f"/shares/{table.share}/schemas/{table.schema}/tables/{table.name}/metadata" - ) as lines: + f"/shares/{table.share}/schemas/{table.schema}/tables/{table.name}/metadata", + return_headers=True + ) as values: + headers = values[0] + # it's a bug in the server if it doesn't return delta-table-version in the header + if "delta-table-version" not in headers: + raise LookupError("Missing delta-table-version header") + lines = values[1] protocol_json = json.loads(next(lines)) metadata_json = json.loads(next(lines)) return QueryTableMetadataResponse( + delta_table_version=int(headers.get("delta-table-version")), protocol=Protocol.from_json(protocol_json["protocol"]), metadata=Metadata.from_json(metadata_json["metaData"]), ) @retry_with_exponential_backoff - def query_table_version(self, table: Table) -> QueryTableVersionResponse: - headers = self._head_internal( - f"/shares/{table.share}/schemas/{table.schema}/tables/{table.name}" - ) + def query_table_version( + self, + table: Table, + starting_timestamp: Optional[str] = None) -> QueryTableVersionResponse: + query_str = f"/shares/{table.share}/schemas/{table.schema}/tables/{table.name}" + if starting_timestamp is not None: + query_str += f"?startingTimestamp={quote(starting_timestamp)}" + with self._get_internal( + query_str, + return_headers=True + ) as values: + headers = values[0] - # it's a bug in the server if it doesn't return delta-table-version in the header - if "delta-table-version" not in headers: - raise LookupError("Missing delta-table-version header") + # it's a bug in the server if it doesn't return delta-table-version in the header + if "delta-table-version" not in headers: + raise LookupError("Missing delta-table-version header") - table_version = int(headers.get("delta-table-version")) - return QueryTableVersionResponse(delta_table_version=table_version) + table_version = int(headers.get("delta-table-version")) + return QueryTableVersionResponse(delta_table_version=table_version) @retry_with_exponential_backoff def list_files_in_table( @@ -272,10 +289,18 @@ def list_files_in_table( with self._post_internal( f"/shares/{table.share}/schemas/{table.schema}/tables/{table.name}/query", data=data, - ) as lines: + return_headers=True + ) as values: + headers = values[0] + # it's a bug in the server if it doesn't return delta-table-version in the header + if "delta-table-version" not in headers: + raise LookupError("Missing delta-table-version header") + + lines = values[1] protocol_json = json.loads(next(lines)) metadata_json = json.loads(next(lines)) return ListFilesInTableResponse( + delta_table_version=int(headers.get("delta-table-version")), protocol=Protocol.from_json(protocol_json["protocol"]), metadata=Metadata.from_json(metadata_json["metaData"]), add_files=[AddFile.from_json(json.loads(file)["file"]) for file in lines], @@ -314,31 +339,34 @@ def list_table_changes(self, table: Table, cdfOptions: CdfOptions) -> ListTableC def close(self): self._session.close() - def _get_internal(self, target: str, data: Optional[Dict[str, Any]] = None): - return self._request_internal(request=self._session.get, target=target, params=data) - - def _post_internal(self, target: str, data: Optional[Dict[str, Any]] = None): - return self._request_internal(request=self._session.post, target=target, json=data) + def _get_internal( + self, + target: str, + data: Optional[Dict[str, Any]] = None, + return_headers: bool = False): + return self._request_internal( + request=self._session.get, return_headers=return_headers, target=target, params=data) - def _head_internal(self, target: str): - assert target.startswith("/"), "Targets should start with '/'" - response = self._session.head(f"{self._profile.endpoint}{target}") - try: - response.raise_for_status() - headers = response.headers - return headers - finally: - response.close() + def _post_internal( + self, + target: str, + data: Optional[Dict[str, Any]] = None, + return_headers: bool = False): + return self._request_internal( + request=self._session.post, return_headers=return_headers, target=target, json=data) @contextmanager - def _request_internal(self, request, target: str, **kwargs) -> Generator[str, None, None]: + def _request_internal(self, request, return_headers, target: str, **kwargs) -> Generator[str, None, None]: assert target.startswith("/"), "Targets should start with '/'" response = request(f"{self._profile.endpoint}{target}", **kwargs) try: response.raise_for_status() lines = response.iter_lines(decode_unicode=True) try: - yield lines + if return_headers: + yield response.headers, lines + else: + yield lines finally: collections.deque(lines, maxlen=0) except HTTPError as e: diff --git a/python/delta_sharing/tests/test_rest_client.py b/python/delta_sharing/tests/test_rest_client.py index f5698c0e5..4128cc42b 100644 --- a/python/delta_sharing/tests/test_rest_client.py +++ b/python/delta_sharing/tests/test_rest_client.py @@ -170,6 +170,7 @@ def test_query_table_metadata_non_partitioned(rest_client: DataSharingRestClient response = rest_client.query_table_metadata( Table(name="table1", share="share1", schema="default") ) + assert response.delta_table_version > 1 assert response.protocol == Protocol(min_reader_version=1) assert response.metadata == Metadata( id="ed96aa41-1d81-4b7f-8fb5-846878b4b0cf", @@ -189,6 +190,7 @@ def test_query_table_metadata_partitioned(rest_client: DataSharingRestClient): response = rest_client.query_table_metadata( Table(name="table2", share="share2", schema="default") ) + assert response.delta_table_version > 1 assert response.protocol == Protocol(min_reader_version=1) assert response.metadata == Metadata( id="f8d5c169-3d01-4ca3-ad9e-7dc3355aedb2", @@ -210,6 +212,7 @@ def test_query_table_metadata_partitioned_different_schemas( response = rest_client.query_table_metadata( Table(name="table3", share="share1", schema="default") ) + assert response.delta_table_version > 1 assert response.protocol == Protocol(min_reader_version=1) assert response.metadata == Metadata( id="7ba6d727-a578-4234-a138-953f790b427c", @@ -234,6 +237,28 @@ def test_query_existed_table_version(rest_client: DataSharingRestClient): assert response.delta_table_version > 0 +@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) +def test_query_table_version_with_timestamp(rest_client: DataSharingRestClient): + response = rest_client.query_table_version( + Table(name="cdf_table_cdf_enabled", share="share1", schema="default"), + starting_timestamp="2020-01-01 00:00:00.0" + ) + assert isinstance(response.delta_table_version, int) + assert response.delta_table_version == 0 + + +@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) +def test_query_table_version_with_timestamp_exception(rest_client: DataSharingRestClient): + try: + rest_client.query_table_version( + Table(name="table1", share="share1", schema="default"), + starting_timestamp="2020-01-1 00:00:00.0" + ) + except Exception as e: + assert isinstance(e, HTTPError) + assert "Reading table by version or timestamp is not supported" in (str(e)) + + @pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) def test_query_nonexistent_table_version(rest_client: DataSharingRestClient): with pytest.raises(HTTPError): @@ -248,6 +273,7 @@ def test_list_files_in_table_non_partitioned(rest_client: DataSharingRestClient) Table(name="table1", share="share1", schema="default"), predicateHints=["date = '2021-01-31'"], ) + assert response.delta_table_version > 1 assert response.protocol == Protocol(min_reader_version=1) assert response.metadata == Metadata( id="ed96aa41-1d81-4b7f-8fb5-846878b4b0cf", @@ -295,6 +321,7 @@ def test_list_files_in_table_partitioned(rest_client: DataSharingRestClient): predicateHints=["date = '2021-01-31'"], limitHint=123, ) + assert response.delta_table_version > 1 assert response.protocol == Protocol(min_reader_version=1) assert response.metadata == Metadata( id="f8d5c169-3d01-4ca3-ad9e-7dc3355aedb2", @@ -342,6 +369,7 @@ def test_list_files_in_table_partitioned_different_schemas( response = rest_client.list_files_in_table( Table(name="table3", share="share1", schema="default") ) + assert response.delta_table_version > 1 assert response.protocol == Protocol(min_reader_version=1) assert response.metadata == Metadata( id="7ba6d727-a578-4234-a138-953f790b427c", @@ -403,6 +431,7 @@ def test_list_files_in_table_version( Table(name="cdf_table_cdf_enabled", share="share1", schema="default"), version=1 ) + assert response.delta_table_version == 1 assert response.protocol == Protocol(min_reader_version=1) assert response.metadata == Metadata( id="16736144-3306-4577-807a-d3f899b77670", From 09f0339dc916b5312b866deddaba9938b0f34401 Mon Sep 17 00:00:00 2001 From: Lin Zhou Date: Mon, 24 Oct 2022 22:36:12 -0700 Subject: [PATCH 6/8] fix test --- python/delta_sharing/rest_client.py | 21 +++++++++++++++------ python/delta_sharing/tests/test_reader.py | 16 ++++++++++++---- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/python/delta_sharing/rest_client.py b/python/delta_sharing/rest_client.py index 5e1fc0e62..dac096207 100644 --- a/python/delta_sharing/rest_client.py +++ b/python/delta_sharing/rest_client.py @@ -249,7 +249,8 @@ def query_table_metadata(self, table: Table) -> QueryTableMetadataResponse: def query_table_version( self, table: Table, - starting_timestamp: Optional[str] = None) -> QueryTableVersionResponse: + starting_timestamp: Optional[str] = None, + ) -> QueryTableVersionResponse: query_str = f"/shares/{table.share}/schemas/{table.schema}/tables/{table.name}" if starting_timestamp is not None: query_str += f"?startingTimestamp={quote(starting_timestamp)}" @@ -343,20 +344,28 @@ def _get_internal( self, target: str, data: Optional[Dict[str, Any]] = None, - return_headers: bool = False): + return_headers: bool = False, + ): return self._request_internal( - request=self._session.get, return_headers=return_headers, target=target, params=data) + request=self._session.get, return_headers=return_headers, target=target, params=data) def _post_internal( self, target: str, data: Optional[Dict[str, Any]] = None, - return_headers: bool = False): + return_headers: bool = False, + ): return self._request_internal( - request=self._session.post, return_headers=return_headers, target=target, json=data) + request=self._session.post, return_headers=return_headers, target=target, json=data) @contextmanager - def _request_internal(self, request, return_headers, target: str, **kwargs) -> Generator[str, None, None]: + def _request_internal( + self, + request, + return_headers, + target: str, + **kwargs, + ) -> Generator[str, None, None]: assert target.startswith("/"), "Targets should start with '/'" response = request(f"{self._profile.endpoint}{target}", **kwargs) try: diff --git a/python/delta_sharing/tests/test_reader.py b/python/delta_sharing/tests/test_reader.py index c326a46f2..3c5f2f91e 100644 --- a/python/delta_sharing/tests/test_reader.py +++ b/python/delta_sharing/tests/test_reader.py @@ -73,7 +73,9 @@ def list_files_in_table( stats="", ), ] - return ListFilesInTableResponse(protocol=None, metadata=metadata, add_files=add_files) + return ListFilesInTableResponse( + delta_table_version=1, protocol=None, metadata=metadata, add_files=add_files + ) reader = DeltaSharingReader(Table("table_name", "share_name", "schema_name"), RestClientMock()) pdf = reader.to_pandas() @@ -126,7 +128,9 @@ def list_files_in_table( stats="", ), ] - return ListFilesInTableResponse(protocol=None, metadata=metadata, add_files=add_files) + return ListFilesInTableResponse( + delta_table_version=1, protocol=None, metadata=metadata, add_files=add_files + ) reader = DeltaSharingReader(Table("table_name", "share_name", "schema_name"), RestClientMock()) pdf = reader.to_pandas() @@ -184,7 +188,9 @@ def list_files_in_table( stats="", ), ] - return ListFilesInTableResponse(protocol=None, metadata=metadata, add_files=add_files) + return ListFilesInTableResponse( + delta_table_version=1, protocol=None, metadata=metadata, add_files=add_files + ) reader = DeltaSharingReader(Table("table_name", "share_name", "schema_name"), RestClientMock()) pdf = reader.to_pandas() @@ -238,7 +244,9 @@ def list_files_in_table( ) ) add_files: Sequence[AddFile] = [] - return ListFilesInTableResponse(protocol=None, metadata=metadata, add_files=add_files) + return ListFilesInTableResponse( + delta_table_version=1, protocol=None, metadata=metadata, add_files=add_files + ) reader = DeltaSharingReader( Table("table_name", "share_name", "schema_name"), RestClientMock() # type: ignore From c610d4f2eed5fa620efb18cc4220d93244946670 Mon Sep 17 00:00:00 2001 From: Lin Zhou Date: Mon, 24 Oct 2022 23:12:54 -0700 Subject: [PATCH 7/8] fix build --- python/delta_sharing/rest_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/delta_sharing/rest_client.py b/python/delta_sharing/rest_client.py index dac096207..3d9551183 100644 --- a/python/delta_sharing/rest_client.py +++ b/python/delta_sharing/rest_client.py @@ -365,7 +365,7 @@ def _request_internal( return_headers, target: str, **kwargs, - ) -> Generator[str, None, None]: + ): assert target.startswith("/"), "Targets should start with '/'" response = request(f"{self._profile.endpoint}{target}", **kwargs) try: From 7aa5b4170ab22a9dcbcfb429b702a8b89c389363 Mon Sep 17 00:00:00 2001 From: Lin Zhou Date: Tue, 25 Oct 2022 14:36:10 -0700 Subject: [PATCH 8/8] fix import --- python/delta_sharing/rest_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/delta_sharing/rest_client.py b/python/delta_sharing/rest_client.py index 3d9551183..abaf3e098 100644 --- a/python/delta_sharing/rest_client.py +++ b/python/delta_sharing/rest_client.py @@ -17,7 +17,7 @@ from contextlib import contextmanager from dataclasses import dataclass import json -from typing import Any, ClassVar, Dict, Generator, List, Optional, Sequence +from typing import Any, ClassVar, Dict, List, Optional, Sequence from urllib.parse import quote, urlparse import time import logging