Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28209][CORE][SHUFFLE] Proposed new shuffle writer API #25007

Closed
wants to merge 33 commits into from

Conversation

mccheah
Copy link
Contributor

@mccheah mccheah commented Jun 28, 2019

What changes were proposed in this pull request?

As part of the shuffle storage API proposed in SPARK-25299, this introduces an API for persisting shuffle data in arbitrary storage systems.

This patch introduces several concepts:

  • ShuffleDataIO, which is the root of the entire plugin tree that will be proposed over the course of the shuffle API project.
  • ShuffleExecutorComponents - the subset of plugins for managing shuffle-related components for each executor. This will in turn instantiate shuffle readers and writers.
  • ShuffleMapOutputWriter interface - instantiated once per map task. This provides child ShufflePartitionWriter instances for persisting the bytes for each partition in the map task.

The default implementation of these plugins exactly mirror what was done by the existing shuffle writing code - namely, writing the data to local disk and writing an index file. We leverage the APIs in the BypassMergeSortShuffleWriter only. Follow-up PRs will use the APIs in SortShuffleWriter and UnsafeShuffleWriter, but are left as future work to minimize the review surface area.

How was this patch tested?

New unit tests were added. Micro-benchmarks indicate there's no slowdown in the affected code paths.

Introduces the new Shuffle Writer API. Ported from bloomberg#5.
…524)

Implements the shuffle writer API by writing shuffle files to local disk and using the index block resolver to commit data and write index files.

The logic in `BypassMergeSortShuffleWriter` has been refactored to use the base implementation of the plugin instead.

APIs have been slightly renamed to clarify semantics after considering nuances in how these are to be implemented by other developers.

Follow-up commits are to come for `SortShuffleWriter` and `UnsafeShuffleWriter`.

Ported from bloomberg#6, credits to @ifilonenko.
#532)

* [SPARK-25299] Use the shuffle writer plugin for the SortShuffleWriter.

* Remove unused

* Handle empty partitions properly.

* Adjust formatting

* Don't close streams twice.

Because compressed output streams don't like it.

* Clarify comment
Implements the shuffle locations API as part of SPARK-25299.

This adds an additional field to all `MapStatus` objects: a `MapShuffleLocations` that indicates where a task's map output is stored. This module is optional and implementations of the pluggable shuffle writers and readers can ignore it accordingly.

This API is designed with the use case in mind of future plugin implementations desiring to have the driver store metadata about where shuffle blocks are stored.

There are a few caveats to this design:

- We originally wanted to remove the `BlockManagerId` from `MapStatus` entirely and replace it with this object. However, doing this proves to be very difficult, as many places use the block manager ID for other kinds of shuffle data bookkeeping. As a result, we concede to storing the block manager ID redundantly here. However, the overhead should be minimal: because we cache block manager ids and default map shuffle locations, the two fields in `MapStatus` should point to the same object on the heap. Thus we add `O(M)` storage overhead on the driver, where for each map status we're storing an additional pointer to the same on-heap object. We will run benchmarks against the TPC-DS workload to see if there are significant performance repercussions for this implementation.

- `KryoSerializer` expects `CompressedMapStatus` and `HighlyCompressedMapStatus` to be serialized via reflection, so originally all fields of these classes needed to be registered with Kryo. However, the `MapShuffleLocations` is now pluggable. We think however that previously Kryo was defaulting to Java serialization anyways, so we now just explicitly tell Kryo to use `ExternalizableSerializer` to deal with these objects. There's a small hack in the serialization protocol that attempts to avoid serializing the same `BlockManagerId` twice in the case that the map shuffle locations is a `DefaultMapShuffleLocations`.
…tion ids (#540)

We originally made the shuffle map output writer API behave like an iterator in fetching the "next" partition writer. However, the shuffle writer implementations tend to skip opening empty partitions. If we used an iterator-like API though we would be tied down to opening a partition writer for every single partition, even if some of them are empty. Here, we go back to using specific partition identifiers to give us more freedom to avoid needing to create writers for empty partitions.
…535)

* Propose a new NIO transfer API for partition writing.

This solves the consistency and resource leakage concerns with the first iteration of thie API, where it
would not be obvious that the streamable resources created by ShufflePartitionWriter needed to be closed by
ShuffleParittionWriter#close as opposed to closing the resources directly.

This introduces the following adjustments:

- Channel-based writes are separated out to their own module, SupportsTransferTo. This allows the transfer-to
  APIs to be modified independently, and users that only provide output streams can ignore the NIO APIs entirely.
  This also allows us to mark the base ShufflePartitionWriter as a stable API eventually while keeping the NIO
  APIs marked as experimental or developer-api.

- We add APIs that explicitly encodes the notion of transferring bytes from one source to another. The partition
  writer returns an instance of TransferrableWritableByteChannel, which has APIs for accepting a
  TransferrableReadableByteChannel and can tell the readable byte channel to transfer its bytes out to some
  destination sink.

- The resources returned by ShufflePartitionWriter are always closed. Internally, DefaultMapOutputWriter keeps
  resources open until commitAllPartitions() is called.

* Migrate unsafe shuffle writer to use new byte channel API.

* More sane implementation for unsafe

* Fix style

* Address comments

* Fix imports

* Fix build

* Fix more build problems

* Address comments.
@mccheah
Copy link
Contributor Author

mccheah commented Jun 28, 2019

ok to test

@mccheah
Copy link
Contributor Author

mccheah commented Jun 28, 2019

@jerryshao @squito @yifeih

@SparkQA
Copy link

SparkQA commented Jun 28, 2019

Test build #107020 has finished for PR 25007 at commit 3167030.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 28, 2019

Test build #107029 has finished for PR 25007 at commit 70f59db.

  • This patch fails Java style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 29, 2019

Test build #107031 has finished for PR 25007 at commit 3083d86.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 29, 2019

Test build #107032 has finished for PR 25007 at commit 4c3d692.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* limitations under the License.
*/

package org.apache.spark.api.shuffle;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it is proper to add the interfaces to here o.a.s.api? Looks like most of the things under the api package are related to rdd functions. How about this package o.a.s.shuffle.api?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

mapOutputWriter.commitAllPartitions();
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(),
partitionLengths);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the indention here is not correct.


/**
* :: Experimental ::
* An interface for giving streams / channels for shuffle writes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we omit "channel"? there's nothing else in the API referencing it

@SparkQA
Copy link

SparkQA commented Jul 2, 2019

Test build #107088 has finished for PR 25007 at commit 2421c92.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for folks that haven't been following along in your fork, can you also give a link to what the complete implementation looks like? Even if that code is not merge-able quality, it can still be helpful to see how the pieces fit together. Also if you have a link to some new shuffle storage implementation.

* limitations under the License.
*/

package org.apache.spark.api.shuffle;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

TransferrableWritableByteChannel outputChannel = null;
try (FileChannel inputChannel = in.getChannel()) {
if (writer instanceof SupportsTransferTo) {
outputChannel = ((SupportsTransferTo) writer).openTransferrableChannel();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember we discussed this before -- but why doesn't this just return a WritableByteChannel? Whatever the reason is should probably be a comment somewhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again we needed consistency between this writer and the UnsafeShuffleWriter. See palantir#535 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I'm not sure we want to add a comment here until we have the parallel implementation in UnsafeShuffleWriter, which I've broken off into a separate patch. We can add the documentation there so that the comparison is more obvious. Thoughts?


/**
* Copy all bytes from the source readable byte channel into this byte channel.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

though you mention "copy all", its probably worth repeating in this comment that this differs from FileChannel.transferTo(), in that this will block until all bytes have been transferred

public interface ShuffleExecutorComponents {
void initializeExecutor(String appId, String execId);

ShuffleWriteSupport writes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should have a doc. At the very least, I'd mention that its called once per ShuffleMapTask

import org.apache.spark.api.shuffle.ShuffleExecutorComponents;
import org.apache.spark.api.shuffle.ShuffleDataIO;

public class DefaultShuffleDataIO implements ShuffleDataIO {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should have a comment that its implementing the only shuffle storage available with spark <= 2.4, using local data & index fiiles.

In fact I'm wondering if it should be renamed to LocalShuffleStorageDataIO or something like that ...

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

import org.apache.spark.storage.TimeTrackingOutputStream;
import org.apache.spark.util.Utils;

public class DefaultShuffleMapOutputWriter implements ShuffleMapOutputWriter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, a comment here on how this is creating local index & data files, the only option with spark <= 2.4 would be helpful.


void commitAllPartitions() throws IOException;

void abort(Throwable error) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these should have some more docs. Eg. at least saying one of these is created for the output of each ShuffleMapTask, and that the "partition" being referenced here is the reduce partition, so getPartitionedWriter will get called once per reduce partition

@SparkQA
Copy link

SparkQA commented Jul 25, 2019

Test build #108136 has finished for PR 25007 at commit 9f17b9b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 25, 2019

Test build #108146 has finished for PR 25007 at commit b8b7b8d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* @since 3.0.0
*/
@Private
public interface ShuffleWriteSupport {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This layer has already been removed. : )

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks ok bar the metrics stuff.

* Implementations that intend on combining the bytes for all the partitions written by this
* map task should reuse the same channel instance across all the partition writers provided
* by the parent {@link ShuffleMapOutputWriter}. If one does so, ensure that
* {@link WritableByteChannelWrapper#close()} does not close the resource, since it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "it" here should be replaced with "the underlying channel" (otherwise it seems to refer to a specific instance of WritableByteChannelWrapper).

* This method is primarily for advanced optimizations where bytes can be copied from the input
* spill files to the output channel without copying data into memory.
* <p>
* The default implementation should be sufficient for most situations. Only override this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I'm not sure this is true, if the goal is to actually provide an optimization. In that case, the default implementation is only sufficient if your stream is a FileInputStream (just checked what Channels.newChannel() does).

Otherwise, the wrapper created will copy data into user memory, basically negating the optimization.

(Which maybe is an argument for returning null here and falling back to the normal IO path when that happens.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we're saying is that this kind of low-level optimization isn't the first place to look to improve performance most of the time, so to speak. So if one has to do the optimization, they should provide the proper override, but, the specific optimization isn't a critical factor to consider outside of the local disk implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So why not follow my suggestion and return null here by default? It makes it much more clear that this implementation is not needed, and that by default the non-nio path is used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's primarily to avoid returning null from the API - in that case I'd rather return Optional, then Optional.empty.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. The main thing is returning something that indicates that this feature is not supported, instead of by default wrapping things a way that might actually hurt performance.

int mapId,
long mapTaskAttemptId,
int numPartitions,
ShuffleWriteMetricsReporter mapTaskWriteMetrics) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imran IIRC will only be back next week, so unless you're ok with waiting, probably should remove this and re-add it later after we figure out exactly what's needed.

(0 until NUM_PARTITIONS).foreach { p =>
val writer = mapOutputWriter.getPartitionWriter(p)
val outputTempFile = File.createTempFile("channelTemp", "", tempDir)
val outputTempFileStream = new FileOutputStream(outputTempFile)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Files.write(Path, byte[]) is basically a one-line version of these statements.


package org.apache.spark.shuffle.api;

import java.io.IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: white space between different import groups.

@mccheah
Copy link
Contributor Author

mccheah commented Jul 29, 2019

retest this please

* stream might compress or encrypt the bytes before persisting the data to the backing
* data store.
*/
long getNumBytesWritten();
Copy link

@hiboyang hiboyang Jul 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class delegates writing to OutputStream by openStream(). Will getNumBytesWritten() in this class access internal state inside that OutputStream? How about let OutputStream track the number of bytes written so this class does not need to access OutputStream? One possible solution is to add a subclass of OutputStream to track number of bytes. Something like existing TimeTrackingOutputStream class in Spark which extends OutputStream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that if the implementation also supports creating a custom WritableByteChannel, then the number of bytes written would be from that of the channel, not the output stream. One could see us having both a custom output stream and an added method on WritableByteChannelWrapper.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I also remember why we didn't attach it to the output stream - it's particularly because of the lifecycle. If we have an output stream for the partition that pads bytes upon closing the stream, it's unclear that one will continue to call methods on the output stream object after it has been closed. That's why we have the contract:

  1. Open stream for writing bytes.
  2. Write bytes
  3. Close stream
  4. Get written bytes for that partition, accounting for the fact that the above step closed the stream.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, the OutputStream returned by openStream() is tightly coupled with ShufflePartitionWriter. Could we merge them together into one class, e.g.

ShufflePartitionWriterStream extends OutputStream {
  open();
  getNumBytesWritten();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An OutputStream instance is considered opened as soon as the object exists, which is why OutputStream extends Closeable. As soon as I have a reference to the OutputStream object I can call write on it to push bytes to the sink. So having a separate open method doesn't make sense.

The open method belongs in the ShufflePartitionWriter API, which is effectively what we have with openStream and openChannel.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I mean the OutputStream returned by openStream() is tightly coupled with ShufflePartitionWriter, thus suggest merging them together. for example, rename ShufflePartitionWriter to ShufflePartitionWriterStream which extends OutputStream:

ShufflePartitionWriterStream extends OutputStream {
void open();
long getNumBytesWritten();
}

In this case, user do not need to create a ShufflePartitionWriter and then call its openStream() method to get an OutputStream. Instead, user will create ShufflePartitionWriterStream, which is already an OutputStream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But again, do we call getNumBytesWritten before or after calling close on this object? If before, does it include the bytes that might be padded in close-ing the stream? If after, are we going to be invoking methods on a closed resource, and is that reasonable?

initStream();
partStream = new PartitionWriterStream(partitionId);
}
return partStream;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel a little uncomfortable to return internal field "partStream" to outside of this class. Is it possible to modify the design here to avoid returning internal field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that we want to share the stream across all partition writes in this implementation. I think returning the internal field represents that paradigm properly.

* guaranteed to be called for every partition id in the above described range. In particular,
* no guarantees are made as to whether or not this method will be called for empty partitions.
*/
ShufflePartitionWriter getPartitionWriter(int reducePartitionId) throws IOException;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why "calls to this method will be invoked with monotonically increasing reducePartitionIds"? This may cause potential issues in future and cause burden on implementation. for example, if people want to implement multiple partition writers and write shuffle data in parallel. It cannot guarantee monotonically increasing reducePartitionIds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

People using this will be using it with SortShuffleManager which has a specific algorithm that won't open streams in parallel. If these invariants are broken, it implies the algorithm has changed, in which case we'd need to reconsider these APIs.

}

@Override
public ShufflePartitionWriter getPartitionWriter(int reducePartitionId) throws IOException {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method looks a little risky. Its name is called getPartitionWriter, but it actually modifies this class's internal state. People need to call getPartitionWriter and finish writing for that partition before call getPartitionWriter again. This may cause confusion to user and may be misused as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can clarify the documentation here, but that behavior is supposed to be part of the contract for these APIs, to remain consistent with the sort shuffle algorithm.

@SparkQA
Copy link

SparkQA commented Jul 29, 2019

Test build #108342 has finished for PR 25007 at commit b8b7b8d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mccheah
Copy link
Contributor Author

mccheah commented Jul 29, 2019

@vanzin @squito latest patch addresses a bunch of comments. Please take a look.

@SparkQA
Copy link

SparkQA commented Jul 30, 2019

Test build #108352 has finished for PR 25007 at commit 06ea01a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, I'll leave it here a bit in case others still have comments.

writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
}
partitionWriters = null;
return lengths;
}

private void writePartitionedDataWithChannel(
File file, WritableByteChannelWrapper outputChannel) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: one arg per line

@squito
Copy link
Contributor

squito commented Jul 30, 2019

lgtm too

@SparkQA
Copy link

SparkQA commented Jul 30, 2019

Test build #108416 has finished for PR 25007 at commit 7dceec9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Jul 30, 2019

Merging to master. On to the next round...

@vanzin vanzin closed this in abef84a Jul 30, 2019
* <code>spark.shuffle.sort.io.plugin.class</code>.
* @since 3.0.0
*/
@Private
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question from SPARK-28568. Is it an API or not? Looks so given the PR description. @Private is:

  • This should be used only when the standard Scala / Java means of protecting classes are
  • insufficient. In particular, Java has no equivalent of private[spark], so we use this annotation
  • in its place.

So @Private doesn't look like for APIs. Shall we change it to @Unstable (maybe with an explicit warning)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon it'll all eventually be @Experimental, but we decided to start by making it @Private just in case spark 3.0 gets released in the middle. (discussed here: #25007 (comment))

Looks like we forgot to file a follow up jira about that, I just filed https://issues.apache.org/jira/browse/SPARK-28592

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, okie. That's good.
My impression was that @Unstable guarantees less than @Experimental. Maybe we can consider this point as well later.

* @since 3.0.0
*/
@Private
public interface WritableByteChannelWrapper extends Closeable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we only need a wrapper for WritableByteChannel, but not OutputStream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to return the FileChannel object directly to the caller, because FileChannel#transfer[from|to] checks instanceof on the argument channel to transfer to/from in order to decide to optimize via zero-memory copy. Extending FileChannel is nearly impossible since it's an internal JDK abstract class with a lot of methods. But if we return the FileChannel, we have no way to shield the channel from being closed so that we can share the same channel resource across partitions.

This has come up in #25007 (comment) and palantir#535 and especially palantir#535 (comment). Given that this has come up as a question a number of times, I wonder if there's a better way we can make the semantics more accessible. I don't see a way to improve the architecture itself, but perhaps better documentation in the right places explaining why we went about this the way we did is warranted.

* provided upon the creation of this map output writer via
* {@link ShuffleExecutorComponents#createMapOutputWriter(int, int, long, int)}.
* <p>
* Calls to this method will be invoked with monotonically increasing reducePartitionIds; each
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How useful is this? I think we can make Spark shuffle more flexible if we don't guarantee this. Do you have a concrete example of how an implementation can leverage this guarantee?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark's existing implementation makes this assumption. The index & data file assume they are in sequential order.

though it would be really easy to change the index format to allow for the order to random (just need to include a start and end, rather having the end be implicit).

* @param numPartitions The number of partitions that will be written by the map task. Some of
* these partitions may be empty.
*/
ShuffleMapOutputWriter createMapOutputWriter(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During the fix of SPARK-25341, we need to pass more param into shuffle writer and shuffle block resolver, give #25361 for the quick API change review. Thanks :)

final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mccheah Sorry to bring up such an old PR lol.
But why didn't we make this taken care of by specific plugin? This is not spill.

yaooqinn pushed a commit that referenced this pull request Jun 14, 2024
…and deprecate `spark.shuffle.unsafe.file.output.buffer`

### What changes were proposed in this pull request?
Deprecate spark.shuffle.unsafe.file.output.buffer and add a new config spark.shuffle.localDisk.file.output.buffer instead.

### Why are the changes needed?
The old config is desgined to be used in UnsafeShuffleWriter, but now it has been used in all local shuffle writers through LocalDiskShuffleMapOutputWriter, introduced by #25007.

### Does this PR introduce _any_ user-facing change?
Old still works, advised to use new.

### How was this patch tested?
Passed existing tests.

Closes #39819 from wayneguow/shuffle_output_buffer.

Authored-by: wayneguow <guow93@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet