Skip to content

Commit

Permalink
[DELTA-OSS-EXTERNAL] Set the correct Hadoop configuration for checkpo…
Browse files Browse the repository at this point in the history
…int write

Checkpoint write doesn't pass the correct Hadoop configuration. It may fail when users specify their storage credentials using Spark session configs.

This PR fixes it and adds a test for it.

Closes #784

Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>

Author: Shixiong Zhu <zsxwing@gmail.com>

#28914 is resolved by zsxwing/66uz7rxl.

GitOrigin-RevId: cc42b868f2a3ecffca88424fb7465792dc22c7e6
  • Loading branch information
zsxwing authored and pranavanand committed Oct 13, 2021
1 parent 2067d9e commit 43d1422
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ object Checkpoints extends DeltaLogging {
// log store and decide whether to use rename.
val useRename = deltaLog.store.isPartialWriteVisible(deltaLog.logPath)

val hadoopConf = spark.sessionState.newHadoopConf
val checkpointSize = spark.sparkContext.longAccumulator("checkpointSize")
val numOfFiles = spark.sparkContext.longAccumulator("numOfFiles")
// Use the string in the closure as Path is not Serializable.
Expand All @@ -255,7 +256,7 @@ object Checkpoints extends DeltaLogging {

val (factory, serConf) = {
val format = new ParquetFileFormat()
val job = Job.getInstance()
val job = Job.getInstance(hadoopConf)
(format.prepareWrite(spark, job, Map.empty, schema),
new SerializableConfiguration(job.getConfiguration))
}
Expand Down Expand Up @@ -306,7 +307,7 @@ object Checkpoints extends DeltaLogging {
if (useRename) {
val src = new Path(writtenPath)
val dest = new Path(path)
val fs = dest.getFileSystem(spark.sessionState.newHadoopConf)
val fs = dest.getFileSystem(hadoopConf)
var renameDone = false
try {
if (fs.rename(src, dest)) {
Expand Down
15 changes: 15 additions & 0 deletions core/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ class DeltaLogSuite extends QueryTest
}
}

test("checkpoint write should use the correct Hadoop configuration") {
withTempDir { dir =>
withSQLConf(
"fs.AbstractFileSystem.fake.impl" -> classOf[FakeAbstractFileSystem].getName,
"fs.fake.impl" -> classOf[FakeFileSystem].getName,
"fs.fake.impl.disable.cache" -> "true") {
val path = s"fake://${dir.getCanonicalPath}"
val log = DeltaLog.forTable(spark, path)
val txn = log.startTransaction()
txn.commitManually(AddFile("foo", Map.empty, 1, 1, true))
log.checkpoint()
}
}
}

testQuietly("update should pick up checkpoints") {
withTempDir { tempDir =>
val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath))
Expand Down

0 comments on commit 43d1422

Please sign in to comment.