From 43d14226cc802d721d1683495cdc8511acf460a1 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 29 Sep 2021 19:30:19 +0000 Subject: [PATCH] [DELTA-OSS-EXTERNAL] Set the correct Hadoop configuration for checkpoint write Checkpoint write doesn't pass the correct Hadoop configuration. It may fail when users specify their storage credentials using Spark session configs. This PR fixes it and adds a test for it. Closes delta-io/delta#784 Signed-off-by: Shixiong Zhu Author: Shixiong Zhu #28914 is resolved by zsxwing/66uz7rxl. GitOrigin-RevId: cc42b868f2a3ecffca88424fb7465792dc22c7e6 --- .../org/apache/spark/sql/delta/Checkpoints.scala | 5 +++-- .../apache/spark/sql/delta/DeltaLogSuite.scala | 15 +++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala b/core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala index aade150060f..fa4c4087211 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala @@ -237,6 +237,7 @@ object Checkpoints extends DeltaLogging { // log store and decide whether to use rename. val useRename = deltaLog.store.isPartialWriteVisible(deltaLog.logPath) + val hadoopConf = spark.sessionState.newHadoopConf val checkpointSize = spark.sparkContext.longAccumulator("checkpointSize") val numOfFiles = spark.sparkContext.longAccumulator("numOfFiles") // Use the string in the closure as Path is not Serializable. @@ -255,7 +256,7 @@ object Checkpoints extends DeltaLogging { val (factory, serConf) = { val format = new ParquetFileFormat() - val job = Job.getInstance() + val job = Job.getInstance(hadoopConf) (format.prepareWrite(spark, job, Map.empty, schema), new SerializableConfiguration(job.getConfiguration)) } @@ -306,7 +307,7 @@ object Checkpoints extends DeltaLogging { if (useRename) { val src = new Path(writtenPath) val dest = new Path(path) - val fs = dest.getFileSystem(spark.sessionState.newHadoopConf) + val fs = dest.getFileSystem(hadoopConf) var renameDone = false try { if (fs.rename(src, dest)) { diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala index 840df337b3b..f86e30606dd 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala @@ -78,6 +78,21 @@ class DeltaLogSuite extends QueryTest } } + test("checkpoint write should use the correct Hadoop configuration") { + withTempDir { dir => + withSQLConf( + "fs.AbstractFileSystem.fake.impl" -> classOf[FakeAbstractFileSystem].getName, + "fs.fake.impl" -> classOf[FakeFileSystem].getName, + "fs.fake.impl.disable.cache" -> "true") { + val path = s"fake://${dir.getCanonicalPath}" + val log = DeltaLog.forTable(spark, path) + val txn = log.startTransaction() + txn.commitManually(AddFile("foo", Map.empty, 1, 1, true)) + log.checkpoint() + } + } + } + testQuietly("update should pick up checkpoints") { withTempDir { tempDir => val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath))