Skip to content

[SPARK-38706][CORE] Use URI in FallbackStorage.copy#36017

Closed
williamhyun wants to merge 1 commit intoapache:masterfrom
williamhyun:fallbackstorage
Closed

[SPARK-38706][CORE] Use URI in FallbackStorage.copy#36017
williamhyun wants to merge 1 commit intoapache:masterfrom
williamhyun:fallbackstorage

Conversation

@williamhyun
Copy link
Member

@williamhyun williamhyun commented Mar 31, 2022

What changes were proposed in this pull request?

This PR aims to use URI in FallbackStorage.copy method.

Why are the changes needed?

Like the case of SPARK-38652, the current fallback feature is broken with S3A due to Hadoop 3.3.2's org.apache.hadoop.fs.PathIOException.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Manually start one master and executor and decommission the executor.

spark.decommission.enabled                          true
spark.storage.decommission.enabled                  true
spark.storage.decommission.shuffleBlocks.enabled    true
spark.storage.decommission.fallbackStorage.path     s3a://spark/storage/
$ curl -v -X POST -d "host=hostname" http://hostname:8080/workers/kill/

@github-actions github-actions bot added the CORE label Mar 31, 2022
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you for catching this, @williamhyun .

I also verified this before and after.

22/03/30 19:43:06 ERROR BlockManagerDecommissioner: Error occurred during migrating migrate_shuffle_0_0
org.apache.hadoop.fs.PathIOException: `Cannot get relative path for URI:file://

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-38706][CORE] Use URI in FallbackStorage.copy [SPARK-38706][CORE] Use URI in FallbackStorage.copy Mar 31, 2022
Copy link
Contributor

@weixiuli weixiuli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM +1

@dongjoon-hyun
Copy link
Member

I also verified that this is the last instance because RocksDBFileManager is already using URI too.

$ git grep -C1 copyFromLocalFile | grep -v test
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala-          val hash = JavaUtils.nonNegativeHash(indexFile.getName)
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala:          fallbackFileSystem.copyFromLocalFile(
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala-            new Path(indexFile.getAbsolutePath),
--
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala-            val hash = JavaUtils.nonNegativeHash(dataFile.getName)
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala:            fallbackFileSystem.copyFromLocalFile(
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala-              new Path(dataFile.getAbsolutePath),
--
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala-    try {
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala:      fs.copyFromLocalFile(delSrc, overwrite, src, dest)
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala-    } catch {
--
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala-          val dfsFile = dfsFilePath(dfsFileName)
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:          // Note: The implementation of copyFromLocalFile() closes the output stream when there is
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala-          // any exception while copying. So this may generate partial files on DFS. But that is
--
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala-          // not going to be used at all. Eventually these files should get cleared.
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:          fs.copyFromLocalFile(
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala-            new Path(localFile.getAbsoluteFile.toURI), dfsFile)
--

@dongjoon-hyun
Copy link
Member

All tests passed. Merged to master/3.3.

dongjoon-hyun pushed a commit that referenced this pull request Mar 31, 2022
### What changes were proposed in this pull request?

This PR aims to use URI in `FallbackStorage.copy` method.

### Why are the changes needed?

Like the case of SPARK-38652, the current fallback feature is broken with `S3A` due to Hadoop 3.3.2's `org.apache.hadoop.fs.PathIOException`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manually start one master and executor and decommission the executor.

```
spark.decommission.enabled                          true
spark.storage.decommission.enabled                  true
spark.storage.decommission.shuffleBlocks.enabled    true
spark.storage.decommission.fallbackStorage.path     s3a://spark/storage/
```

```
$ curl -v -X POST -d "host=hostname" http://hostname:8080/workers/kill/
```

Closes #36017 from williamhyun/fallbackstorage.

Authored-by: William Hyun <william@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 60d0921)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
kazuyukitanimura pushed a commit to kazuyukitanimura/spark that referenced this pull request Aug 10, 2022
### What changes were proposed in this pull request?

This PR aims to use URI in `FallbackStorage.copy` method.

### Why are the changes needed?

Like the case of SPARK-38652, the current fallback feature is broken with `S3A` due to Hadoop 3.3.2's `org.apache.hadoop.fs.PathIOException`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manually start one master and executor and decommission the executor.

```
spark.decommission.enabled                          true
spark.storage.decommission.enabled                  true
spark.storage.decommission.shuffleBlocks.enabled    true
spark.storage.decommission.fallbackStorage.path     s3a://spark/storage/
```

```
$ curl -v -X POST -d "host=hostname" http://hostname:8080/workers/kill/
```

Closes apache#36017 from williamhyun/fallbackstorage.

Authored-by: William Hyun <william@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 60d0921)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 48839f6)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit ad442c4)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants