Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support startingTimestamp and GET method for getTableVersion #199

Merged
merged 10 commits into from Oct 26, 2022
Expand Up @@ -243,9 +243,23 @@ class DeltaSharingService(serverConfig: ServerConfig) {
def getTableVersion(
@Param("share") share: String,
@Param("schema") schema: String,
@Param("table") table: String): HttpResponse = processRequest {
@Param("table") table: String,
@Param("startingTimestamp") @Nullable startingTimestamp: String
): HttpResponse = processRequest {
val tableConfig = sharedTableManager.getTable(share, schema, table)
val version = deltaSharedTableLoader.loadTable(tableConfig).tableVersion
if (startingTimestamp != null && !tableConfig.cdfEnabled) {
throw new DeltaSharingIllegalArgumentException("Reading table by version or timestamp is" +
" not supported because change data feed is not enabled on table: " +
s"$share.$schema.$table")
}
val version = deltaSharedTableLoader.loadTable(tableConfig).getTableVersion(
Option(startingTimestamp)
)
if (startingTimestamp != null && version < tableConfig.startVersion) {
throw new DeltaSharingIllegalArgumentException(
s"You can only query table data since version ${tableConfig.startVersion}."
linzhou-db marked this conversation as resolved.
Show resolved Hide resolved
)
}
val headers = createHeadersBuilderForTableVersion(version).build()
HttpResponse.of(headers)
}
Expand Down Expand Up @@ -310,6 +324,11 @@ class DeltaSharingService(serverConfig: ServerConfig) {
request.version,
request.timestamp,
request.startingVersion)
if (version < tableConfig.startVersion) {
throw new DeltaSharingIllegalArgumentException(
s"You can only query table data since version ${tableConfig.startVersion}."
)
}
logger.info(s"Took ${System.currentTimeMillis - start} ms to load the table " +
s"and sign ${actions.length - 2} urls for table $share/$schema/$table")
streamingOutput(Some(version), actions)
Expand Down
Expand Up @@ -150,6 +150,24 @@ class DeltaSharedTable(
}
}

def getTableVersion(startingTimestamp: Option[String]): Long = withClassLoader {
linzhou-db marked this conversation as resolved.
Show resolved Hide resolved
if (startingTimestamp.isEmpty) {
tableVersion
} else {
val ts = DeltaSharingHistoryManager.getTimestamp("startingTimestamp", startingTimestamp.get)
// get a version at or after the provided timestamp, if the timestamp is early than version 0,
// return 0.
try {
deltaLog.getVersionAtOrAfterTimestamp(ts.getTime())
} catch {
// Convert to DeltaSharingIllegalArgumentException to return 4xx instead of 5xx error code
// Only convert known exceptions around timestamp too late or too early
case e: IllegalArgumentException =>
throw new DeltaSharingIllegalArgumentException(e.getMessage)
}
}
}

/** Return the current table version */
def tableVersion: Long = withClassLoader {
val snapshot = deltaLog.snapshot
Expand Down
Expand Up @@ -307,18 +307,64 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {


integrationTest("table1 - head - /shares/{share}/schemas/{schema}/tables/{table}") {
val url = requestPath("/shares/share1/schemas/default/tables/table1")
val connection = new URL(url).openConnection().asInstanceOf[HttpsURLConnection]
// getTableVersion succeeds without parameters
var url = requestPath("/shares/share1/schemas/default/tables/table1")
var connection = new URL(url).openConnection().asInstanceOf[HttpsURLConnection]
connection.setRequestMethod("HEAD")
connection.setRequestProperty("Authorization", s"Bearer ${TestResource.testAuthorizationToken}")
val input = connection.getInputStream()
var input = connection.getInputStream()
try {
IOUtils.toString(input)
} finally {
input.close()
}
val deltaTableVersion = connection.getHeaderField("Delta-Table-Version")
var deltaTableVersion = connection.getHeaderField("Delta-Table-Version")
assert(deltaTableVersion == "2")

// getTableVersion succeeds with parameters
url = requestPath("/shares/share1/schemas/default/tables/cdf_table_cdf_enabled?startingTimestamp=2000-01-01%2000:00:00")
connection = new URL(url).openConnection().asInstanceOf[HttpsURLConnection]
connection.setRequestMethod("HEAD")
connection.setRequestProperty("Authorization", s"Bearer ${TestResource.testAuthorizationToken}")
input = connection.getInputStream()
try {
IOUtils.toString(input)
} finally {
input.close()
}
deltaTableVersion = connection.getHeaderField("Delta-Table-Version")
assert(deltaTableVersion == "0")
}

integrationTest("getTableVersion - exceptions") {
// timestamp can be any string here, it's resolved in DeltaSharedTableLoader
assertHttpError(
url = requestPath("/shares/share2/schemas/default/tables/table2?startingTimestamp=abc"),
method = "HEAD",
data = None,
expectedErrorCode = 400,
expectedErrorMessage = "Reading table by version or timestamp is not supported because "
)

// invalid startingTimestamp format
assertHttpError(
url = requestPath(
"/shares/share1/schemas/default/tables/cdf_table_cdf_enabled?startingTimestamp=abc"
),
method = "HEAD",
data = None,
expectedErrorCode = 400,
expectedErrorMessage = "Invalid startingTimestamp"
)

// timestamp after the latest version
assertHttpError(
url = requestPath("/shares/share1/schemas/default/tables/cdf_table_cdf_enabled?startingTimestamp=9999-01-01%2000:00:00"),
method = "HEAD",
data = None,
expectedErrorCode = 400,
expectedErrorMessage = "The provided timestamp (9999-01-01 00:00:00.0) is after the latest version available"
)
}

integrationTest("table1 - non partitioned - /shares/{share}/schemas/{schema}/tables/{table}/metadata") {
Expand Down Expand Up @@ -686,6 +732,17 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {
expectedErrorCode = 400,
expectedErrorMessage = "The provided timestamp (9999-01-01 00:00:00.0) is after the latest version available"
)

// can only query table data since version 1
// 1651614979 PST: 2022-05-03T14:56:19.000+0000, version 1 is 1 second later
val tsStr = new Timestamp(1651614979000L).toString
assertHttpError(
url = requestPath("/shares/share1/schemas/default/tables/cdf_table_with_partition/query"),
method = "POST",
data = Some(s"""{"timestamp": "$tsStr"}"""),
expectedErrorCode = 400,
expectedErrorMessage = "You can only query table data since version 1"
)
}

integrationTest("cdf_table_cdf_enabled - timestamp on version 1 - /shares/{share}/schemas/{schema}/tables/{table}/query") {
Expand Down Expand Up @@ -1051,7 +1108,7 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {
connection.getInputStream()
}
assert(e.getMessage.contains(s"Server returned HTTP response code: $expectedErrorCode"))
assert(IOUtils.toString(connection.getErrorStream()).contains(expectedErrorMessage))
assert(method == "HEAD" || IOUtils.toString(connection.getErrorStream()).contains(expectedErrorMessage))
linzhou-db marked this conversation as resolved.
Show resolved Hide resolved
}

integrationTest("valid request json but incorrect field type") {
Expand Down
Expand Up @@ -44,7 +44,7 @@ import io.delta.sharing.spark.util.{JsonUtils, RetryUtils, UnexpectedHttpStatus}
private[sharing] trait DeltaSharingClient {
def listAllTables(): Seq[Table]

def getTableVersion(table: Table): Long
def getTableVersion(table: Table, startingTimestamp: Option[String] = None): Long

def getMetadata(table: Table): DeltaTableMetadata

Expand Down Expand Up @@ -161,12 +161,19 @@ private[spark] class DeltaSharingRestClient(
tables
}

override def getTableVersion(table: Table): Long = {
override def getTableVersion(table: Table, startingTimestamp: Option[String] = None): Long = {
val encodedShareName = URLEncoder.encode(table.share, "UTF-8")
val encodedSchemaName = URLEncoder.encode(table.schema, "UTF-8")
val encodedTableName = URLEncoder.encode(table.name, "UTF-8")

val encodedParam = if (startingTimestamp.isDefined) {
s"?startingTimestamp=${URLEncoder.encode(startingTimestamp.get)}"
} else {
""
}
val target =
getTargetUrl(s"/shares/$encodedShareName/schemas/$encodedSchemaName/tables/$encodedTableName")
getTargetUrl(s"/shares/$encodedShareName/schemas/$encodedSchemaName/tables/" +
s"$encodedTableName$encodedParam")
val (version, _) = getResponse(new HttpHead(target))
version.getOrElse {
throw new IllegalStateException("Cannot find Delta-Table-Version in the header")
Expand Down
Expand Up @@ -62,12 +62,27 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
}
}

integrationTest("getTableVersion") {
integrationTest("getTableVersion - success") {
val client = new DeltaSharingRestClient(testProfileProvider, sslTrustAll = true)
try {
assert(client.getTableVersion(Table(name = "table2", schema = "default", share = "share2")) == 2)
assert(client.getTableVersion(Table(name = "table1", schema = "default", share = "share1")) == 2)
assert(client.getTableVersion(Table(name = "table3", schema = "default", share = "share1")) == 4)
assert(client.getTableVersion(Table(name = "cdf_table_cdf_enabled", schema = "default", share = "share1"),
startingTimestamp = Some("2020-01-01 00:00:00")) == 0)
} finally {
client.close()
}
}

integrationTest("getTableVersion - exceptions") {
val client = new DeltaSharingRestClient(testProfileProvider, sslTrustAll = true)
try {
val errorMessage = intercept[UnexpectedHttpStatus] {
client.getTableVersion(Table(name = "table1", schema = "default", share = "share1"),
startingTimestamp = Some("2020-01-01 00:00:00"))
}.getMessage
assert(errorMessage.contains("400 Bad Request"))
} finally {
client.close()
}
Expand Down
Expand Up @@ -51,7 +51,7 @@ class TestDeltaSharingClient(
DeltaTableMetadata(0, Protocol(0), metadata)
}

override def getTableVersion(table: Table): Long = 0
override def getTableVersion(table: Table, startingTimestamp: Option[String] = None): Long = 0

override def getFiles(
table: Table,
Expand Down