Skip to content

Commit

Permalink
RemoteDeltaLog accept input Metadata in branch 1.0 (#423)
Browse files Browse the repository at this point in the history
* RemoteDeltaLog accept input Metadata in branch 1.0

* format
  • Loading branch information
linzhou-db committed Oct 12, 2023
1 parent be81356 commit bc17599
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 32 deletions.
45 changes: 31 additions & 14 deletions client/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,36 @@ import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.{DataType, StructField, StructType}

import io.delta.sharing.client.{DeltaSharingClient, DeltaSharingRestClient}
import io.delta.sharing.client.model.{AddFile, CDFColumnInfo, Metadata, Protocol, Table => DeltaSharingTable}
import io.delta.sharing.client.model.{AddFile, CDFColumnInfo, DeltaTableMetadata, Metadata, Protocol, Table => DeltaSharingTable}
import io.delta.sharing.client.util.ConfUtils
import io.delta.sharing.spark.perf.DeltaSharingLimitPushDown


/** Used to query the current state of the transaction logs of a remote shared Delta table. */
/**
* Used to query the current state of the transaction logs of a remote shared Delta table.
*
* @param initDeltaTableMetadata Used to initialize RemoteSnapshot, and is always preferred to use
* if set, to avoid an unnecessary call with additional delay.
* It's because we'd like to fetch the table metadata for a pre-check
* to decide whether to use parquet format sharing or delta format
* sharing before initializing RemoteDeltaLog.
*/
private[sharing] class RemoteDeltaLog(
val table: DeltaSharingTable,
val path: Path,
val client: DeltaSharingClient) {
val client: DeltaSharingClient,
initDeltaTableMetadata: Option[DeltaTableMetadata] = None) {

@volatile private var currentSnapshot: RemoteSnapshot = new RemoteSnapshot(path, client, table)
@volatile private var currentSnapshot: RemoteSnapshot = new RemoteSnapshot(
path, client, table, initDeltaTableMetadata = initDeltaTableMetadata)

def snapshot(
versionAsOf: Option[Long] = None,
timestampAsOf: Option[String] = None): RemoteSnapshot = {
if (versionAsOf.isEmpty && timestampAsOf.isEmpty) {
currentSnapshot
} else {
new RemoteSnapshot(path, client, table, versionAsOf, timestampAsOf)
new RemoteSnapshot(path, client, table, versionAsOf, timestampAsOf, initDeltaTableMetadata)
}
}

Expand Down Expand Up @@ -103,15 +113,16 @@ private[sharing] object RemoteDeltaLog {
def apply(
path: String,
forStreaming: Boolean = false,
responseFormat: String = DeltaSharingOptions.RESPONSE_FORMAT_PARQUET): RemoteDeltaLog = {
responseFormat: String = DeltaSharingOptions.RESPONSE_FORMAT_PARQUET,
initDeltaTableMetadata: Option[DeltaTableMetadata] = None): RemoteDeltaLog = {
val parsedPath = DeltaSharingRestClient.parsePath(path)
val client = DeltaSharingRestClient(parsedPath.profileFile, forStreaming, responseFormat)
val deltaSharingTable = DeltaSharingTable(
name = parsedPath.table,
schema = parsedPath.schema,
share = parsedPath.share
)
new RemoteDeltaLog(deltaSharingTable, new Path(path), client)
new RemoteDeltaLog(deltaSharingTable, new Path(path), client, initDeltaTableMetadata)
}
}

Expand All @@ -121,11 +132,22 @@ class RemoteSnapshot(
client: DeltaSharingClient,
table: DeltaSharingTable,
versionAsOf: Option[Long] = None,
timestampAsOf: Option[String] = None) extends Logging {
timestampAsOf: Option[String] = None,
initDeltaTableMetadata: Option[DeltaTableMetadata] = None) extends Logging {

protected def spark = SparkSession.active

lazy val (metadata, protocol, version) = getTableMetadata
// initDeltaTableMetadata is from the initialization of RemoteDeltaLog and RemoteSnapshot.
// It's always preferred to use if set, to avoid an unnecessary call with additional delay.
// The reason is we'd like to fetch the table metadata for a pre-check to decide whether to
// use parquet format sharing or delta format sharing.
private lazy val tableMetadata = initDeltaTableMetadata.getOrElse(
client.getMetadata(table, versionAsOf, timestampAsOf)
)

lazy val (metadata, protocol, version) = {
(tableMetadata.metadata, tableMetadata.protocol, tableMetadata.version)
}

lazy val schema: StructType = DeltaTableUtils.toSchema(metadata.schemaString)

Expand Down Expand Up @@ -164,11 +186,6 @@ class RemoteSnapshot(
}
}

private def getTableMetadata: (Metadata, Protocol, Long) = {
val tableMetadata = client.getMetadata(table, versionAsOf, timestampAsOf)
(tableMetadata.metadata, tableMetadata.protocol, tableMetadata.version)
}

private def checkProtocolNotChange(newProtocol: Protocol): Unit = {
if (newProtocol != protocol) {
throw new SparkException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,17 @@ package io.delta.sharing.spark

import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Files
import java.util.Base64

import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{
AttributeReference => SqlAttributeReference,
EqualTo => SqlEqualTo,
Literal => SqlLiteral
}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference => SqlAttributeReference, EqualTo => SqlEqualTo, Literal => SqlLiteral}
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{
FloatType,
IntegerType,
LongType,
StringType,
StructField,
StructType
}
import org.apache.spark.sql.types.{FloatType, IntegerType, LongType, StringType, StructField, StructType}

import io.delta.sharing.client.DeltaSharingClient
import io.delta.sharing.client.model.Table
import io.delta.sharing.client.util.JsonUtils
import io.delta.sharing.filters.{BaseOp, OpConverter}

class RemoteDeltaLogSuite extends SparkFunSuite with SharedSparkSession {

Expand Down Expand Up @@ -170,11 +156,13 @@ class RemoteDeltaLogSuite extends SparkFunSuite with SharedSparkSession {
test("snapshot file index test") {
val spark = SparkSession.active
val client = new TestDeltaSharingClient()
client.clear()
val snapshot = new RemoteSnapshot(new Path("test"), client, Table("fe", "fi", "fo"))
assert(snapshot.sizeInBytes == 100)
assert(snapshot.metadata.numFiles == 2)
assert(snapshot.schema("col1").nullable)
assert(snapshot.schema("col2").nullable)
assert(TestDeltaSharingClient.numMetadataCalled == 1)

// Create an index without limits.
val fileIndex = {
Expand Down Expand Up @@ -239,6 +227,7 @@ class RemoteDeltaLogSuite extends SparkFunSuite with SharedSparkSession {
// files with additional field returned from server.
val spark = SparkSession.active
val client = new TestDeltaSharingClient()
client.clear()
val snapshot = new RemoteSnapshot(
new Path("test"),
client,
Expand All @@ -249,6 +238,7 @@ class RemoteDeltaLogSuite extends SparkFunSuite with SharedSparkSession {
assert(snapshot.metadata.numFiles == 2)
assert(!snapshot.schema("col1").nullable)
assert(!snapshot.schema("col2").nullable)
assert(TestDeltaSharingClient.numMetadataCalled == 1)

// Create an index without limits.
val fileIndex = {
Expand Down Expand Up @@ -313,6 +303,7 @@ class RemoteDeltaLogSuite extends SparkFunSuite with SharedSparkSession {
// files with additional field returned from server.
val spark = SparkSession.active
val client = new TestDeltaSharingClient()
client.clear()
val snapshot = new RemoteSnapshot(
new Path("test"),
client,
Expand All @@ -324,6 +315,7 @@ class RemoteDeltaLogSuite extends SparkFunSuite with SharedSparkSession {
assert(snapshot.metadata.numFiles == 2)
assert(snapshot.schema("col1").nullable)
assert(!snapshot.schema("col2").nullable)
assert(TestDeltaSharingClient.numMetadataCalled == 1)

// Create an index without limits.
val fileIndex = {
Expand Down Expand Up @@ -386,7 +378,7 @@ class RemoteDeltaLogSuite extends SparkFunSuite with SharedSparkSession {
val path = new Path("test")
val table = Table("fe", "fi", "fo")
val client = new TestDeltaSharingClient()
val remoteDeltaLog = new RemoteDeltaLog(table, path, client)

val snapshot = new RemoteSnapshot(path, client, table)
val params = RemoteDeltaFileIndexParams(spark, snapshot, client.getProfileProvider)

Expand Down Expand Up @@ -505,4 +497,34 @@ class RemoteDeltaLogSuite extends SparkFunSuite with SharedSparkSession {
assert(TestDeltaSharingClient.limits === Seq(2L, 3L))
}
}

test("RemoteDeltaLog Initialized with metadata") {
val path = new Path("profileFile")
val table = Table(share = "share", schema = "schema", name = "table")
val client = new TestDeltaSharingClient()
client.clear()

def checkGetMetadataCalledOnce(versionAsOf: Option[Long] = None, nullable: Boolean): Unit = {
val deltaTableMetadata = client.getMetadata(table, versionAsOf, None)
assert(TestDeltaSharingClient.numMetadataCalled == 1)

val remoteDeltaLog = new RemoteDeltaLog(table, path, client, Some(deltaTableMetadata))
val relation = remoteDeltaLog.createRelation(versionAsOf, None, Map.empty[String, String])
val hadoopFsRelation = relation.asInstanceOf[HadoopFsRelation]
val fileIndex = hadoopFsRelation.location.asInstanceOf[RemoteDeltaSnapshotFileIndex]

// nullable indicates that the metadata is fetched for the correct version.
val snapshot = fileIndex.params.snapshotAtAnalysis
assert(snapshot.sizeInBytes == 100)
assert(snapshot.metadata.numFiles == 2)
assert(snapshot.schema("col1").nullable == nullable)
assert(snapshot.schema("col2").nullable == nullable)

assert(TestDeltaSharingClient.numMetadataCalled == 1)
}

checkGetMetadataCalledOnce(None, true)
client.clear()
checkGetMetadataCalledOnce(Some(1L), false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class TestDeltaSharingClient(
table: Table,
versionAsOf: Option[Long] = None,
timestampAsOf: Option[String] = None): DeltaTableMetadata = {
TestDeltaSharingClient.numMetadataCalled += 1
// Different metadata are returned for rpcs with different parameters to test the parameters
// are set properly.
if (versionAsOf.exists(_ == 1)) {
Expand Down Expand Up @@ -166,6 +167,7 @@ class TestDeltaSharingClient(
def clear(): Unit = {
TestDeltaSharingClient.limits = Nil
TestDeltaSharingClient.jsonPredicateHints = Nil
TestDeltaSharingClient.numMetadataCalled = 0
}
}

Expand All @@ -179,5 +181,7 @@ object TestDeltaSharingClient {
var limits = Seq.empty[Long]
var jsonPredicateHints = Seq.empty[String]

var numMetadataCalled = 0

val TESTING_TIMESTAMP = "2022-01-01 00:00:00.0"
}

0 comments on commit bc17599

Please sign in to comment.