Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ final class BypassMergeSortShuffleWriter<K, V>
private final long mapId;
private final Serializer serializer;
private final ShuffleExecutorComponents shuffleExecutorComponents;
private final boolean remoteWrites;

/** Array of file writers, one for each partition */
private DiskBlockObjectWriter[] partitionWriters;
Expand Down Expand Up @@ -136,6 +137,7 @@ final class BypassMergeSortShuffleWriter<K, V>
this.mapId = mapId;
this.shuffleId = dep.shuffleId();
this.partitioner = dep.partitioner();
this.remoteWrites = dep.shuffleWriterProcessor() instanceof org.apache.spark.sql.execution.exchange.ConsolidationShuffleMarker;
this.numPartitions = partitioner.numPartitions();
this.writeMetrics = writeMetrics;
this.serializer = dep.serializer();
Expand All @@ -149,12 +151,14 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
.createMapOutputWriter(shuffleId, mapId, numPartitions);
BlockManagerId blockManagerId = remoteWrites ?
RemoteShuffleStorage.BLOCK_MANAGER_ID() : blockManager.shuffleServerId();
try {
if (!records.hasNext()) {
partitionLengths = mapOutputWriter.commitAllPartitions(
ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE).getPartitionLengths();
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapId, getAggregatedChecksumValue());
blockManagerId, partitionLengths, mapId, getAggregatedChecksumValue());
return;
}
final SerializerInstance serInstance = serializer.newInstance();
Expand Down Expand Up @@ -196,7 +200,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {

partitionLengths = writePartitionedData(mapOutputWriter);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapId, getAggregatedChecksumValue());
blockManagerId, partitionLengths, mapId, getAggregatedChecksumValue());
} catch (Exception e) {
try {
mapOutputWriter.abort(e);
Expand Down Expand Up @@ -236,8 +240,10 @@ private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) thro
try {
for (int i = 0; i < numPartitions; i++) {
final File file = partitionWriterSegments[i].file();
ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i);
if (file.exists()) {
// TODO: Remove thsi comment: the line below was moved so that assertions
// cann be added and in general safe
ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i);
if (transferToEnabled) {
// Using WritableByteChannelWrapper to make resource closing consistent between
// this implementation and UnsafeShuffleWriter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.nio.channels.WritableByteChannel;
import java.util.Iterator;

import org.apache.spark.storage.BlockManagerId;
import org.apache.spark.storage.RemoteShuffleStorage;
import scala.Option;
import scala.Product2;
import scala.jdk.javaapi.CollectionConverters;
Expand Down Expand Up @@ -89,6 +91,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final boolean transferToEnabled;
private final int initialSortBufferSize;
private final int mergeBufferSizeInBytes;
private final boolean remoteWrites;

@Nullable private MapStatus mapStatus;
@Nullable private ShuffleExternalSorter sorter;
Expand Down Expand Up @@ -135,6 +138,7 @@ public UnsafeShuffleWriter(
this.shuffleId = dep.shuffleId();
this.serializer = dep.serializer().newInstance();
this.partitioner = dep.partitioner();
this.remoteWrites = dep.shuffleWriterProcessor() instanceof org.apache.spark.sql.execution.exchange.ConsolidationShuffleMarker;
this.writeMetrics = writeMetrics;
this.shuffleExecutorComponents = shuffleExecutorComponents;
this.taskContext = taskContext;
Expand Down Expand Up @@ -247,8 +251,10 @@ void closeAndWriteOutput() throws IOException {
}
}
}
BlockManagerId blockManagerId = remoteWrites ?
RemoteShuffleStorage.BLOCK_MANAGER_ID() : blockManager.shuffleServerId();
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapId, getAggregatedChecksumValue());
blockManagerId, partitionLengths, mapId, getAggregatedChecksumValue());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.storage;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.spark.network.buffer.ManagedBuffer;

/**
* A {@link ManagedBuffer} backed by a file using Hadoop FileSystem.
* This buffer creates an input stream with a 64MB buffer size for efficient reading.
* Note: This implementation throws UnsupportedOperationException for methods that
* require loading the entire file into memory (nioByteBuffer, convertToNetty, convertToNettyForSsl)
* as files can be very large and loading them entirely into memory is not practical.
*/
public class FileSystemManagedBuffer extends ManagedBuffer {
private int bufferSize; // 64MB buffer size
private final Path filePath;
private final long fileSize;
private final Configuration hadoopConf;

public FileSystemManagedBuffer(Path filePath, Configuration hadoopConf) throws IOException {
this.filePath = filePath;
this.hadoopConf = hadoopConf;
// Get file size using FileSystem.newInstance to avoid cached dependencies
FileSystem fileSystem = FileSystem.newInstance(filePath.toUri(), hadoopConf);
try {
this.fileSize = fileSystem.getFileStatus(filePath).getLen();
} finally {
fileSystem.close();
}
bufferSize = 64;
}

public FileSystemManagedBuffer(Path filePath, Configuration hadoopConf, int bufferSize)
throws IOException {
this(filePath, hadoopConf);
this.bufferSize = bufferSize;
}

@Override
public long size() {
return fileSize;
}

@Override
public ByteBuffer nioByteBuffer() throws IOException {
throw new UnsupportedOperationException(
"FileSystemManagedBuffer does not support nioByteBuffer() as it would require loading " +
"the entire file into memory, which is not practical for large files. " +
"Use createInputStream() instead.");
}

@Override
public InputStream createInputStream() throws IOException {
// Create a new FileSystem instance to avoid cached dependencies
// and create a buffered input stream with 64MB buffer size for efficient reading
FileSystem fileSystem = FileSystem.newInstance(filePath.toUri(), hadoopConf);
return fileSystem.open(filePath, bufferSize * 1024 * 1024);
}

@Override
public ManagedBuffer retain() {
// FileSystemManagedBuffer doesn't use reference counting, so just return this
return this;
}

@Override
public ManagedBuffer release() {
// FileSystemManagedBuffer doesn't use reference counting, so just return this
return this;
}

@Override
public Object convertToNetty() {
throw new UnsupportedOperationException(
"FileSystemManagedBuffer does not support convertToNetty() as it would require loading " +
"the entire file into memory, which is not practical for large files. " +
"Use createInputStream() instead.");
}

@Override
public Object convertToNettyForSsl() {
throw new UnsupportedOperationException(
"FileSystemManagedBuffer does not support convertToNettyForSsl()" +
" as it would require loading " +
"the entire file into memory, which is not practical for large files. " +
"Use createInputStream() instead.");
}

@Override
public String toString() {
return "FileSegmentManagedBuffer[file=" + filePath + ",length=" + fileSize + "]";
}
}
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
val mapSideCombine: Boolean = false,
val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor,
val rowBasedChecksums: Array[RowBasedChecksum] = ShuffleDependency.EMPTY_ROW_BASED_CHECKSUMS,
val checksumMismatchFullRetryEnabled: Boolean = false)
val checksumMismatchFullRetryEnabled: Boolean = false
)
extends Dependency[Product2[K, V]] with Logging {

def this(
Expand Down Expand Up @@ -249,7 +250,9 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
)
}

_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
if (!shuffleWriterProcessor.isInstanceOf[org.apache.spark.sql.execution.exchange.ConsolidationShuffleMarker]) {
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
_rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
}

Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,8 @@ class SparkContext(config: SparkConf) extends Logging {
_env.blockManager.initialize(_applicationId)
FallbackStorage.registerBlockManagerIfNeeded(
_env.blockManager.master, _conf, _hadoopConfiguration)
RemoteShuffleStorage.registerBlockManagerifNeeded(_env.blockManager.master, _conf,
_hadoopConfiguration)

// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
Expand Down Expand Up @@ -2377,6 +2379,11 @@ class SparkContext(config: SparkConf) extends Logging {
Utils.tryLogNonFatalError {
FallbackStorage.cleanUp(_conf, _hadoopConfiguration)
}

Utils.tryLogNonFatalError {
RemoteShuffleStorage.cleanUp(_conf, _hadoopConfiguration)
}

Utils.tryLogNonFatalError {
_eventLogger.foreach(_.stop())
}
Expand Down
35 changes: 35 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,13 @@ package object config {
.checkValue(_.endsWith(java.io.File.separator), "Path should end with separator.")
.createOptional

private[spark] val SHUFFLE_REMOTE_STORAGE_CLEANUP =
ConfigBuilder("spark.shuffle.remote.storage.cleanUp")
.doc("If true, Spark cleans up its fallback storage data during shutting down.")
.version("3.2.0")
.booleanConf
.createWithDefault(false)

private[spark] val STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE =
ConfigBuilder("spark.storage.decommission.shuffleBlocks.maxDiskSize")
.doc("Maximum disk space to use to store shuffle blocks before rejecting remote " +
Expand Down Expand Up @@ -2905,4 +2912,32 @@ package object config {
.checkValue(v => v.forall(Set("stdout", "stderr").contains),
"The value only can be one or more of 'stdout, stderr'.")
.createWithDefault(Seq("stdout", "stderr"))

private[spark] val SHUFFLE_REMOTE_STORAGE_PATH =
ConfigBuilder("spark.shuffle.remote.storage.path")
.doc("The location for storing shuffle blocks on remote storage.")
.version("4.1.0")
.stringConf
.checkValue(_.endsWith(java.io.File.separator), "Path should end with separator.")
.createOptional

private[spark] val REMOTE_SHUFFLE_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.remote.buffer.size")
.version("4.1.0")
.stringConf
.createWithDefault("64M")

private[spark] val START_REDUCERS_IN_PARALLEL_TO_MAPPER =
ConfigBuilder("spark.shuffle.consolidation.enabled")
.doc("starts reducers in parallel to mappers")
.version("4.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val EAGERNESS_THRESHOLD_PERCENTAGE =
ConfigBuilder("spark.shuffle.remote.eagerness.percentage")
.doc("Percentage of mapper complet tasks before starting reducers ")
.version("4.1.0")
.intConf
.createWithDefault(20)
}
Original file line number Diff line number Diff line change
Expand Up @@ -1755,7 +1755,13 @@ private[spark] class DAGScheduler(
log"${MDC(STAGE, stage)} (${MDC(RDD_ID, stage.rdd)}) (first 15 tasks are " +
log"for partitions ${MDC(PARTITION_IDS, tasks.take(15).map(_.partitionId))})")
val shuffleId = stage match {
case s: ShuffleMapStage => Some(s.shuffleDep.shuffleId)
case s: ShuffleMapStage =>
// hack to prioritize remote shuffle writes
if (properties != null) {
properties.setProperty("remote",
s.shuffleDep.shuffleWriterProcessor.isInstanceOf[org.apache.spark.sql.execution.exchange.ConsolidationShuffleMarker].toString)
}
Some(s.shuffleDep.shuffleId)
case _: ResultStage => None
}

Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Pool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,11 @@ private[spark] class Pool(
new FairSchedulingAlgorithm()
case SchedulingMode.FIFO =>
new FIFOSchedulingAlgorithm()
case SchedulingMode.WEIGHTED_FIFO =>
new WeightedFIFOSchedulingAlgorithm()
case _ =>
val msg = s"Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
val msg = s"Unsupported scheduling mode: $schedulingMode. Use FAIR, FIFO," +
s" or WEIGHTED_FIFO instead."
throw new IllegalArgumentException(msg)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,25 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
}
}

private[spark] class WeightedFIFOSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val priority1 = s1.priority
val priority2 = s2.priority
var res = math.signum(priority1 - priority2)
if (res == 0) {
if (s1.weight == s2.weight) {
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = math.signum(stageId1 - stageId2)
} else {
// Higher the weight, earlier should it run(unlike priority)
res = math.signum(s2.weight - s1.weight)
}
}
res < 0
}
}

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val minShare1 = s1.minShare
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ package org.apache.spark.scheduler
/**
* "FAIR" and "FIFO" determines which policy is used
* to order tasks amongst a Schedulable's sub-queues
* "WEIGHTED_FIFO" is similar to FIFO but uses weight-based comparison in addition.
* "NONE" is used when the a Schedulable has no sub-queues.
*/
object SchedulingMode extends Enumeration {

type SchedulingMode = Value
val FAIR, FIFO, NONE = Value
val FAIR, FIFO, WEIGHTED_FIFO, NONE = Value
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ private[spark] class ShuffleMapTask(

val rdd = rddAndDep._1
val dep = rddAndDep._2

// While we use the old shuffle fetch protocol, we use partitionId as mapId in the
// ShuffleBlockId construction.
val mapId = if (SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
Expand All @@ -115,3 +116,4 @@ private[spark] class ShuffleMapTask(

override def toString: String = "ShuffleMapTask(%d, %d)".format(stageId, partitionId)
}

Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ private[spark] class TaskSchedulerImpl(
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.WEIGHTED_FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, sc)
case _ =>
Expand Down
Loading