Skip to content

Commit

Permalink
Allow absolute paths (#543)
Browse files Browse the repository at this point in the history
  • Loading branch information
aamend committed May 12, 2023
1 parent baf54ff commit 02a46d1
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 10 deletions.
Expand Up @@ -33,9 +33,11 @@ import io.delta.standalone.types.StructType
import io.delta.standalone.internal.actions.{Action, AddFile, CommitInfo, FileAction, Metadata, Protocol, RemoveFile}
import io.delta.standalone.internal.exception.DeltaErrors
import io.delta.standalone.internal.logging.Logging
import io.delta.standalone.internal.sources.StandaloneHadoopConf
import io.delta.standalone.internal.util.{ConversionUtils, FileNames, SchemaMergingUtils, SchemaUtils}
import io.delta.standalone.internal.util.DeltaFileOperations


private[internal] class OptimisticTransactionImpl(
deltaLog: DeltaLogImpl,
snapshot: SnapshotImpl) extends OptimisticTransaction with Logging {
Expand Down Expand Up @@ -243,14 +245,22 @@ private[internal] class OptimisticTransactionImpl(
val customCommitInfo = actions.exists(_.isInstanceOf[CommitInfo])
assert(!customCommitInfo, "Cannot commit a custom CommitInfo in a transaction.")

// This will ignore errors (disabled by default) when trying to relativize a path
// This is specifically for files living in a filesystem different from the base table path
// so one can enable shallow clones across file systems
val relativizeIgnoreError = deltaLog
.hadoopConf
.getBoolean(StandaloneHadoopConf.RELATIVE_PATH_IGNORE, false)

// Convert AddFile paths to relative paths if they're in the table path
var finalActions = actions.map {
case addFile: AddFile =>
addFile.copy(path =
DeltaFileOperations.tryRelativizePath(
deltaLog.fs,
deltaLog.getPath,
new Path(addFile.path)
new Path(addFile.path),
relativizeIgnoreError
).toString)
case a: Action => a
}
Expand Down
Expand Up @@ -353,7 +353,10 @@ private[internal] object SnapshotImpl {
val fs = FileSystem.get(hadoopConf)
fs.makeQualified(hadoopPath).toUri.toString
} else {
// return untouched if it is a relative path or is already fully qualified
// return untouched if
// - path is a relative path
// - or path is already fully qualified
// - or path points to external file systems (authority is not null)
hadoopPath.toUri.toString
}
}
Expand Down
Expand Up @@ -21,6 +21,15 @@ package io.delta.standalone.internal.sources
*/
private[internal] object StandaloneHadoopConf {

/**
* If enabled, this ignores errors when trying to relativize an absolute path of an
* [[io.delta.standalone.actions.AddFile]] across file systems.
* This allows user to define shallow clone delta tables where data resides in
* external file systems such as s3://, wasbs:// or adls://
* By default, this feature is disabled. Set to `true` to enable.
*/
val RELATIVE_PATH_IGNORE = "io.delta.vacuum.relativize.ignoreError"

/** Time zone as which time-based parquet values will be encoded and decoded. */
val PARQUET_DATA_TIME_ZONE_ID = "io.delta.standalone.PARQUET_DATA_TIME_ZONE_ID"

Expand Down
Expand Up @@ -60,14 +60,12 @@ private[internal] object DeltaFileOperations extends Logging {
throw new IllegalStateException(
s"""Failed to relativize the path ($child). This can happen when absolute paths make
|it into the transaction log, which start with the scheme
|s3://, wasbs:// or adls://. This is a bug that has existed before DBR 5.0.
|To fix this issue, please upgrade your writer jobs to DBR 5.0 and please run:
|%scala com.databricks.delta.Delta.fixAbsolutePathsInLog("$child").
|s3://, wasbs:// or adls://.
|
|If this table was created with a shallow clone across file systems
|(different buckets/containers) and this table is NOT USED IN PRODUCTION, you can
|set the SQL configuration spark.databricks.delta.vacuum.relativize.ignoreError
|to true. Using this SQL configuration could lead to accidental data loss,
|set the hadoop configuration io.delta.vacuum.relativize.ignoreError
|to true. Using this configuration could lead to accidental data loss,
|therefore we do not recommend the use of this flag unless
|this is a shallow clone for testing purposes.
""".stripMargin)
Expand Down
Expand Up @@ -27,6 +27,7 @@ import io.delta.standalone.expressions.{And, EqualTo, LessThan, Literal}
import io.delta.standalone.types.{IntegerType, StructField, StructType}

import io.delta.standalone.internal.actions.{Action, AddFile, Metadata}
import io.delta.standalone.internal.sources.StandaloneHadoopConf
import io.delta.standalone.internal.util.{ConversionUtils, FileNames}
import io.delta.standalone.internal.util.TestUtils._

Expand Down Expand Up @@ -54,14 +55,25 @@ class DeltaScanSuite extends FunSuite {
AddFile(i.toString, partitionValues, 1L, 1L, dataChange = true)
}

private val externalFileSystems = Seq("s3://", "wasbs://", "adls://")

private val externalFiles = (1 to 10).map { i =>
val partitionValues = Map("col1" -> (i % 3).toString, "col2" -> (i % 2).toString)
val schema = externalFileSystems(i % 3)
AddFile(s"${schema}path/to/$i.parquet", partitionValues, 1L, 1L, dataChange = true)
}

private val filesDataChangeFalse = files.map(_.copy(dataChange = false))

private val metadataConjunct = new EqualTo(schema.column("col1"), Literal.of(0))
private val dataConjunct = new EqualTo(schema.column("col3"), Literal.of(5))

def withLog(actions: Seq[Action])(test: DeltaLog => Unit): Unit = {
def withLog(
actions: Seq[Action],
configuration: Configuration = new Configuration()
)(test: DeltaLog => Unit): Unit = {
withTempDir { dir =>
val log = DeltaLog.forTable(new Configuration(), dir.getCanonicalPath)
val log = DeltaLog.forTable(configuration, dir.getCanonicalPath)
log.startTransaction().commit(metadata :: Nil, op, "engineInfo")
log.startTransaction().commit(actions, op, "engineInfo")

Expand Down Expand Up @@ -104,6 +116,19 @@ class DeltaScanSuite extends FunSuite {
}
}

test("filtered scan with files stored in external file systems") {
val configuration = new Configuration()
configuration.setBoolean(StandaloneHadoopConf.RELATIVE_PATH_IGNORE, true)
withLog(externalFiles, configuration) { log =>
val filter = dataConjunct
val scan = log.update().scan(filter)
val scannedFiles = scan.getFiles.asScala.map(_.getPath).toSet
val expectedFiles = externalFiles.map(_.path).toSet
assert(scannedFiles == expectedFiles,
"paths should not have been made qualified")
}
}

/**
* This tests the following DeltaScan MemoryOptimized functionalities:
* - skipping AddFiles that don't match the given filter
Expand Down
Expand Up @@ -27,9 +27,11 @@ import io.delta.standalone.DeltaLog
import io.delta.standalone.actions.{Action => ActionJ, AddFile => AddFileJ, CommitInfo, Metadata => MetadataJ, Protocol, SetTransaction => SetTransactionJ}
import io.delta.standalone.types.{IntegerType, StringType, StructField, StructType}

import io.delta.standalone.internal.actions.{AddFile, Metadata}
import io.delta.standalone.internal.actions.AddFile
import io.delta.standalone.internal.sources.StandaloneHadoopConf
import io.delta.standalone.internal.util.TestUtils._


class OptimisticTransactionSuite extends OptimisticTransactionSuiteBase {

///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -316,4 +318,36 @@ class OptimisticTransactionSuite extends OptimisticTransactionSuiteBase {
assert(committedAddFile.getPath === "file:/absolute/path/to/file/test.parquet")
}
}

test("Can't create table with external files") {
val extFile = AddFile("s3://snip/snip.parquet", Map(), 0, 0, true)
val conf = new Configuration()
withTempDir { dir =>
val log = DeltaLog.forTable(conf, dir.getCanonicalPath)
val txn = log.startTransaction()
val e = intercept[IllegalStateException] {
txn.updateMetadata(metadata_colXY)
txn.commit(List(extFile), op, engineInfo)
}
assert(e.getMessage.contains("Failed to relativize the path"))
}
}

test("Create table with external files override") {
val extFile = AddFile("s3://snip/snip.parquet", Map(), 0, 0, true)
val conf = new Configuration()
conf.setBoolean(StandaloneHadoopConf.RELATIVE_PATH_IGNORE, true)
withTempDir { dir =>
val log = DeltaLog.forTable(conf, dir.getCanonicalPath)
val txn = log.startTransaction()
txn.updateMetadata(metadata_colXY)
txn.commit(List(extFile), op, engineInfo)
val committedAddFile = log.update().getAllFiles.asScala.head
val committedPath = new Path(committedAddFile.getPath)
// Path is preserved
assert(committedPath.isAbsolute && !committedPath.isAbsoluteAndSchemeAuthorityNull)
assert(committedPath.toString == "s3://snip/snip.parquet")
}
}

}

0 comments on commit 02a46d1

Please sign in to comment.