Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refresh presigned urls for delta sharing streaming, non cdf queries #221

Merged
merged 3 commits into from Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -17,8 +17,11 @@
package io.delta.sharing.spark

// scalastyle:off import.ordering.noEmptyLine
import java.lang.ref.WeakReference

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.delta.sharing.CachedTableManager
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, DeltaSharingScanUtils, SparkSession}
import org.apache.spark.sql.connector.read.streaming
Expand Down Expand Up @@ -235,6 +238,13 @@ case class DeltaSharingSource(
// If isStartingVersion is true, it means to fetch the snapshot at the fromVersion, which may
// include table changes from previous versions.
val tableFiles = deltaLog.client.getFiles(deltaLog.table, Nil, None, Some(fromVersion), None)
latestRefreshFunc = () => {
deltaLog.client.getFiles(
deltaLog.table, Nil, None, Some(fromVersion), None
).files.map { f =>
f.id -> f.url
}.toMap
}

val numFiles = tableFiles.files.size
tableFiles.files.sortWith(fileActionCompareFunc).zipWithIndex.foreach {
Expand All @@ -260,6 +270,11 @@ case class DeltaSharingSource(
// If isStartingVersion is false, it means to fetch table changes since fromVersion, not
// including files from previous versions.
val tableFiles = deltaLog.client.getFiles(deltaLog.table, fromVersion)
latestRefreshFunc = () => {
deltaLog.client.getFiles(deltaLog.table, fromVersion).addFiles.map { a =>
a.id -> a.url
}.toMap
}
val allAddFiles = verifyStreamHygieneAndFilterAddFiles(tableFiles).groupBy(a => a.version)
for (v <- fromVersion to currentLatestVersion) {

Expand Down Expand Up @@ -479,9 +494,14 @@ case class DeltaSharingSource(
val add = indexedFile.add
AddFile(add.url, add.id, add.partitionValues, add.size, add.stats)
}
val idToUrl = addFilesList.map { add =>
add.id -> add.url
}.toMap

val params = new RemoteDeltaFileIndexParams(spark, initSnapshot)
val fileIndex = new RemoteDeltaBatchFileIndex(params, addFilesList)
CachedTableManager.INSTANCE.register(
params.path.toString, idToUrl, Seq(new WeakReference(fileIndex)), latestRefreshFunc)

val relation = HadoopFsRelation(
fileIndex,
Expand Down
Expand Up @@ -179,21 +179,8 @@ private[sharing] case class RemoteDeltaBatchFileIndex(
override def listFiles(
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
// TODO(lin.zhou): Actually refresh the presigned url in the cache instead of just using
// getIdToUrlMap
CachedTableManager.INSTANCE
.register(params.path.toString, getIdToUrlMap, Seq(new WeakReference(this)), () => {
getIdToUrlMap
})

// We ignore partition filters for list files, since the delta sharing server already
// parforms the filters.
makePartitionDirectories(addFiles)
}

private[sharing] def getIdToUrlMap : Map[String, String] = {
addFiles.map { add =>
add.id -> add.url
}.toMap
}
}