Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28209][CORE][SHUFFLE] Proposed new shuffle writer API #25007

Closed
wants to merge 33 commits into from

Commits on Jun 26, 2019

  1. [SPARK-25299] Introduce the new shuffle writer API (#5) (#520)

    Introduces the new Shuffle Writer API. Ported from bloomberg#5.
    mccheah committed Jun 26, 2019
    Configuration menu
    Copy the full SHA
    1957e82 View commit details
    Browse the repository at this point in the history
  2. [SPARK-25299] Local shuffle implementation of the shuffle writer API (#…

    …524)
    
    Implements the shuffle writer API by writing shuffle files to local disk and using the index block resolver to commit data and write index files.
    
    The logic in `BypassMergeSortShuffleWriter` has been refactored to use the base implementation of the plugin instead.
    
    APIs have been slightly renamed to clarify semantics after considering nuances in how these are to be implemented by other developers.
    
    Follow-up commits are to come for `SortShuffleWriter` and `UnsafeShuffleWriter`.
    
    Ported from bloomberg#6, credits to @ifilonenko.
    mccheah committed Jun 26, 2019
    Configuration menu
    Copy the full SHA
    857552a View commit details
    Browse the repository at this point in the history
  3. Configuration menu
    Copy the full SHA
    d13037f View commit details
    Browse the repository at this point in the history
  4. [SPARK-25299] Use the shuffle writer plugin for the SortShuffleWriter. (

    #532)
    
    * [SPARK-25299] Use the shuffle writer plugin for the SortShuffleWriter.
    
    * Remove unused
    
    * Handle empty partitions properly.
    
    * Adjust formatting
    
    * Don't close streams twice.
    
    Because compressed output streams don't like it.
    
    * Clarify comment
    mccheah committed Jun 26, 2019
    Configuration menu
    Copy the full SHA
    8f5fb60 View commit details
    Browse the repository at this point in the history
  5. [SPARK-25299] Shuffle locations api (#517)

    Implements the shuffle locations API as part of SPARK-25299.
    
    This adds an additional field to all `MapStatus` objects: a `MapShuffleLocations` that indicates where a task's map output is stored. This module is optional and implementations of the pluggable shuffle writers and readers can ignore it accordingly.
    
    This API is designed with the use case in mind of future plugin implementations desiring to have the driver store metadata about where shuffle blocks are stored.
    
    There are a few caveats to this design:
    
    - We originally wanted to remove the `BlockManagerId` from `MapStatus` entirely and replace it with this object. However, doing this proves to be very difficult, as many places use the block manager ID for other kinds of shuffle data bookkeeping. As a result, we concede to storing the block manager ID redundantly here. However, the overhead should be minimal: because we cache block manager ids and default map shuffle locations, the two fields in `MapStatus` should point to the same object on the heap. Thus we add `O(M)` storage overhead on the driver, where for each map status we're storing an additional pointer to the same on-heap object. We will run benchmarks against the TPC-DS workload to see if there are significant performance repercussions for this implementation.
    
    - `KryoSerializer` expects `CompressedMapStatus` and `HighlyCompressedMapStatus` to be serialized via reflection, so originally all fields of these classes needed to be registered with Kryo. However, the `MapShuffleLocations` is now pluggable. We think however that previously Kryo was defaulting to Java serialization anyways, so we now just explicitly tell Kryo to use `ExternalizableSerializer` to deal with these objects. There's a small hack in the serialization protocol that attempts to avoid serializing the same `BlockManagerId` twice in the case that the map shuffle locations is a `DefaultMapShuffleLocations`.
    mccheah committed Jun 26, 2019
    Configuration menu
    Copy the full SHA
    e17c7ea View commit details
    Browse the repository at this point in the history
  6. [SPARK-25299] Move shuffle writers back to being given specific parti…

    …tion ids (#540)
    
    We originally made the shuffle map output writer API behave like an iterator in fetching the "next" partition writer. However, the shuffle writer implementations tend to skip opening empty partitions. If we used an iterator-like API though we would be tied down to opening a partition writer for every single partition, even if some of them are empty. Here, we go back to using specific partition identifiers to give us more freedom to avoid needing to create writers for empty partitions.
    mccheah committed Jun 26, 2019
    Configuration menu
    Copy the full SHA
    3f0c131 View commit details
    Browse the repository at this point in the history
  7. Configuration menu
    Copy the full SHA
    f982df7 View commit details
    Browse the repository at this point in the history
  8. [SPARK-25299] Propose a new NIO transfer API for partition writing. (#…

    …535)
    
    * Propose a new NIO transfer API for partition writing.
    
    This solves the consistency and resource leakage concerns with the first iteration of thie API, where it
    would not be obvious that the streamable resources created by ShufflePartitionWriter needed to be closed by
    ShuffleParittionWriter#close as opposed to closing the resources directly.
    
    This introduces the following adjustments:
    
    - Channel-based writes are separated out to their own module, SupportsTransferTo. This allows the transfer-to
      APIs to be modified independently, and users that only provide output streams can ignore the NIO APIs entirely.
      This also allows us to mark the base ShufflePartitionWriter as a stable API eventually while keeping the NIO
      APIs marked as experimental or developer-api.
    
    - We add APIs that explicitly encodes the notion of transferring bytes from one source to another. The partition
      writer returns an instance of TransferrableWritableByteChannel, which has APIs for accepting a
      TransferrableReadableByteChannel and can tell the readable byte channel to transfer its bytes out to some
      destination sink.
    
    - The resources returned by ShufflePartitionWriter are always closed. Internally, DefaultMapOutputWriter keeps
      resources open until commitAllPartitions() is called.
    
    * Migrate unsafe shuffle writer to use new byte channel API.
    
    * More sane implementation for unsafe
    
    * Fix style
    
    * Address comments
    
    * Fix imports
    
    * Fix build
    
    * Fix more build problems
    
    * Address comments.
    mccheah committed Jun 26, 2019
    Configuration menu
    Copy the full SHA
    6891197 View commit details
    Browse the repository at this point in the history

Commits on Jun 27, 2019

  1. Configuration menu
    Copy the full SHA
    7b44ed2 View commit details
    Browse the repository at this point in the history
  2. Configuration menu
    Copy the full SHA
    df75f1f View commit details
    Browse the repository at this point in the history
  3. Configuration menu
    Copy the full SHA
    a8558af View commit details
    Browse the repository at this point in the history
  4. Revert a bunch of other stuff

    mccheah committed Jun 27, 2019
    Configuration menu
    Copy the full SHA
    806d7bb View commit details
    Browse the repository at this point in the history
  5. More reverts

    mccheah committed Jun 27, 2019
    Configuration menu
    Copy the full SHA
    3167030 View commit details
    Browse the repository at this point in the history

Commits on Jun 28, 2019

  1. Configuration menu
    Copy the full SHA
    70f59db View commit details
    Browse the repository at this point in the history
  2. Fix style

    mccheah committed Jun 28, 2019
    Configuration menu
    Copy the full SHA
    3083d86 View commit details
    Browse the repository at this point in the history
  3. Configuration menu
    Copy the full SHA
    4c3d692 View commit details
    Browse the repository at this point in the history

Commits on Jul 1, 2019

  1. Configuration menu
    Copy the full SHA
    2421c92 View commit details
    Browse the repository at this point in the history

Commits on Jul 8, 2019

  1. Address comments

    mccheah committed Jul 8, 2019
    Configuration menu
    Copy the full SHA
    982f207 View commit details
    Browse the repository at this point in the history
  2. Fix style

    mccheah committed Jul 8, 2019
    Configuration menu
    Copy the full SHA
    594d1e2 View commit details
    Browse the repository at this point in the history

Commits on Jul 12, 2019

  1. Address comments.

    mccheah committed Jul 12, 2019
    Configuration menu
    Copy the full SHA
    66aae91 View commit details
    Browse the repository at this point in the history

Commits on Jul 17, 2019

  1. Configuration menu
    Copy the full SHA
    8b432f9 View commit details
    Browse the repository at this point in the history

Commits on Jul 18, 2019

  1. Address comments.

    mccheah committed Jul 18, 2019
    Configuration menu
    Copy the full SHA
    9f597dd View commit details
    Browse the repository at this point in the history
  2. Restructure test

    mccheah committed Jul 18, 2019
    Configuration menu
    Copy the full SHA
    86c1829 View commit details
    Browse the repository at this point in the history

Commits on Jul 19, 2019

  1. Configuration menu
    Copy the full SHA
    a7885ae View commit details
    Browse the repository at this point in the history
  2. Add more documentation

    mccheah committed Jul 19, 2019
    Configuration menu
    Copy the full SHA
    9893c6c View commit details
    Browse the repository at this point in the history
  3. Configuration menu
    Copy the full SHA
    cd897e7 View commit details
    Browse the repository at this point in the history

Commits on Jul 24, 2019

  1. Address comments

    mccheah committed Jul 24, 2019
    Configuration menu
    Copy the full SHA
    9f17b9b View commit details
    Browse the repository at this point in the history
  2. Code tags

    mccheah committed Jul 24, 2019
    Configuration menu
    Copy the full SHA
    e53a001 View commit details
    Browse the repository at this point in the history
  3. Add some docs

    mccheah committed Jul 24, 2019
    Configuration menu
    Copy the full SHA
    56fa450 View commit details
    Browse the repository at this point in the history

Commits on Jul 25, 2019

  1. Configuration menu
    Copy the full SHA
    b8b7b8d View commit details
    Browse the repository at this point in the history

Commits on Jul 29, 2019

  1. Remove metrics from the API.

    mccheah committed Jul 29, 2019
    Configuration menu
    Copy the full SHA
    2d29404 View commit details
    Browse the repository at this point in the history
  2. Address more comments.

    mccheah committed Jul 29, 2019
    Configuration menu
    Copy the full SHA
    06ea01a View commit details
    Browse the repository at this point in the history

Commits on Jul 30, 2019

  1. Args per line

    mccheah committed Jul 30, 2019
    Configuration menu
    Copy the full SHA
    7dceec9 View commit details
    Browse the repository at this point in the history