From 659376581e20d39f35edd2dfb3138c12c951750e Mon Sep 17 00:00:00 2001 From: Lin Zhou Date: Thu, 8 Dec 2022 15:08:44 -0800 Subject: [PATCH 1/2] Refresh presigned urls for delta sharing streaming --- .../sharing/spark/DeltaSharingSource.scala | 38 ++++++++++++------- .../sharing/spark/RemoteDeltaFileIndex.scala | 13 ------- 2 files changed, 24 insertions(+), 27 deletions(-) diff --git a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala index b8ee1e0e3..64dfba542 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala @@ -17,29 +17,20 @@ package io.delta.sharing.spark // scalastyle:off import.ordering.noEmptyLine +import java.lang.ref.WeakReference + import scala.collection.mutable.ArrayBuffer +import org.apache.spark.delta.sharing.CachedTableManager import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, DeltaSharingScanUtils, SparkSession} import org.apache.spark.sql.connector.read.streaming -import org.apache.spark.sql.connector.read.streaming.{ - ReadAllAvailable, - ReadLimit, - ReadMaxFiles, - SupportsAdmissionControl -} +import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxFiles, SupportsAdmissionControl} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.types.StructType -import io.delta.sharing.spark.model.{ - AddCDCFile, - AddFile, - AddFileForCDF, - DeltaTableFiles, - FileAction, - RemoveFile -} +import io.delta.sharing.spark.model.{AddCDCFile, AddFile, AddFileForCDF, DeltaTableFiles, FileAction, RemoveFile} import io.delta.sharing.spark.util.SchemaUtils /** @@ -145,6 +136,8 @@ case class DeltaSharingSource( private var lastQueriedTableVersion: Long = -1 private val QUERY_TABLE_VERSION_INTERVAL_MILLIS = 30000 // 30 seconds + private var latestRefreshFunc = () => { Map.empty[String, String] } + // Check the latest table version from the delta sharing server through the client.getTableVersion // RPC. Adding a minimum interval of QUERY_TABLE_VERSION_INTERVAL_MILLIS between two consecutive // rpcs to avoid traffic jam on the delta sharing server. @@ -230,6 +223,13 @@ case class DeltaSharingSource( // If isStartingVersion is true, it means to fetch the snapshot at the fromVersion, which may // include table changes from previous versions. val tableFiles = deltaLog.client.getFiles(deltaLog.table, Nil, None, Some(fromVersion), None) + latestRefreshFunc = () => { + deltaLog.client.getFiles( + deltaLog.table, Nil, None, Some(fromVersion), None + ).files.map { f => + f.id -> f.url + }.toMap + } val numFiles = tableFiles.files.size tableFiles.files.sortWith(fileActionCompareFunc).zipWithIndex.foreach { @@ -255,6 +255,11 @@ case class DeltaSharingSource( // If isStartingVersion is false, it means to fetch table changes since fromVersion, not // including files from previous versions. val tableFiles = deltaLog.client.getFiles(deltaLog.table, fromVersion) + latestRefreshFunc = () => { + deltaLog.client.getFiles(deltaLog.table, fromVersion).addFiles.map { a => + a.id -> a.url + }.toMap + } val allAddFiles = verifyStreamHygieneAndFilterAddFiles(tableFiles).groupBy(a => a.version) for (v <- fromVersion to currentLatestVersion) { @@ -469,9 +474,14 @@ case class DeltaSharingSource( val add = indexedFile.add AddFile(add.url, add.id, add.partitionValues, add.size, add.stats) } + val idToUrl = addFilesList.map { add => + add.id -> add.url + }.toMap val params = new RemoteDeltaFileIndexParams(spark, initSnapshot) val fileIndex = new RemoteDeltaBatchFileIndex(params, addFilesList) + CachedTableManager.INSTANCE.register( + params.path.toString, idToUrl, new WeakReference(fileIndex), latestRefreshFunc) val relation = HadoopFsRelation( fileIndex, diff --git a/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaFileIndex.scala b/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaFileIndex.scala index 8e5877d55..a8893e69c 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaFileIndex.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaFileIndex.scala @@ -191,21 +191,8 @@ private[sharing] case class RemoteDeltaBatchFileIndex( override def listFiles( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { - // TODO(lin.zhou): Actually refresh the presigned url in the cache instead of just using - // getIdToUrlMap - CachedTableManager.INSTANCE - .register(params.path.toString, getIdToUrlMap, new WeakReference(this), () => { - getIdToUrlMap - }) - // We ignore partition filters for list files, since the delta sharing server already // parforms the filters. makePartitionDirectories(addFiles) } - - private[sharing] def getIdToUrlMap : Map[String, String] = { - addFiles.map { add => - add.id -> add.url - }.toMap - } } From 81a33b088f8f7b164d5891a151eba1ca1e9f66d3 Mon Sep 17 00:00:00 2001 From: Lin Zhou Date: Mon, 12 Dec 2022 10:11:57 -0800 Subject: [PATCH 2/2] fix build --- .../main/scala/io/delta/sharing/spark/DeltaSharingSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala index 85b14e649..577ada461 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala @@ -501,7 +501,7 @@ case class DeltaSharingSource( val params = new RemoteDeltaFileIndexParams(spark, initSnapshot) val fileIndex = new RemoteDeltaBatchFileIndex(params, addFilesList) CachedTableManager.INSTANCE.register( - params.path.toString, idToUrl, new WeakReference(fileIndex), latestRefreshFunc) + params.path.toString, idToUrl, Seq(new WeakReference(fileIndex)), latestRefreshFunc) val relation = HadoopFsRelation( fileIndex,