Skip to content

Conversation

@wankunde
Copy link
Contributor

@wankunde wankunde commented May 28, 2022

What changes were proposed in this pull request?

Optimize MapOutputTracker.convertMapStatuses() method.

Why are the changes needed?

MapOutputTracker.convertMapStatuses() will be very slow if there are tens of thousands MapStatuses and MergeStatuses.

Benchmark code:

  val benchmark = new Benchmark("MapStatuses Convert", 1, output = output)

    val blockManagerNumber = 1000
    val mapNumber = 50000
    val shufflePartitions = 10000

    val shuffleId: Int = 0
    // First reduce task will fetch map data from startPartition to endPartition
    val startPartition = 0
    val startMapIndex = 0
    val endMapIndex = mapNumber
    val blockManagers = Array.tabulate(blockManagerNumber) { i =>
      BlockManagerId("a", "host" + i, 7337)
    }
    val mapStatuses: Array[MapStatus] = Array.tabulate(mapNumber) { mapTaskId =>
      HighlyCompressedMapStatus(
        blockManagers(mapTaskId % blockManagerNumber),
        Array.tabulate(shufflePartitions)(i => if (i % 50 == 0) 1 else 0),
        mapTaskId)
    }
    val bitmap = new RoaringBitmap()
    Range(0, 4000).foreach(bitmap.add(_))
    val mergeStatuses = Array.tabulate(shufflePartitions) { part =>
      MergeStatus(blockManagers(part % blockManagerNumber), shuffleId, bitmap, 100)
    }

    Array(499, 999, 1499).foreach { endPartition =>
      benchmark.addCase(
        s"Num Maps: $mapNumber Fetch partitions:${endPartition - startPartition + 1}",
        numIters) { _ =>
        MapOutputTracker.convertMapStatuses(
          shuffleId,
          startPartition,
          endPartition,
          mapStatuses,
          startMapIndex,
          endMapIndex,
          Some(mergeStatuses))
      }
    }

Before this PR

================================================================================================
MapStatuses Convert Benchmark
================================================================================================

Java HotSpot(TM) 64-Bit Server VM 1.8.0_281-b09 on Mac OS X 10.15.7
Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
MapStatuses Convert:                      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Num Maps: 50000 Fetch partitions:500               3393           3483          96          0.0  3393439257.0       1.0X
Num Maps: 50000 Fetch partitions:1000              6640           6772         121          0.0  6639654832.0       0.5X
Num Maps: 50000 Fetch partitions:1500             10035          10143         108          0.0 10035100069.0       0.3X

After this PR

================================================================================================
MapStatuses Convert Benchmark
================================================================================================

Java HotSpot(TM) 64-Bit Server VM 1.8.0_281-b09 on Mac OS X 10.15.7
Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
MapStatuses Convert:                      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Num Maps: 50000 Fetch partitions:500                667            679          15          0.0   666562302.0       1.0X
Num Maps: 50000 Fetch partitions:1000              1285           1397         115          0.0  1284808865.0       0.5X
Num Maps: 50000 Fetch partitions:1500              2045           2068          32          0.0  2044951906.0       0.3X

Does this PR introduce any user-facing change?

No

How was this patch tested?

Exists UTs.

@github-actions github-actions bot added the CORE label May 28, 2022
@wankunde wankunde changed the title [SPARK-39325][CORE]Improve MapOutputTracker convertMapStatuses perfor… [SPARK-39325][CORE]Improve MapOutputTracker convertMapStatuses performance May 28, 2022
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@wangyum
Copy link
Member

wangyum commented May 30, 2022

cc @mridulm @Victsm

@mridulm
Copy link
Contributor

mridulm commented May 30, 2022

I will take a look at this later this week.
Looks like a good improvement to add, thanks for the PR !

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, thanks for working on this @wankunde !
+CC @otterc

@mridulm
Copy link
Contributor

mridulm commented Jun 8, 2022

+CC @Ngone51 as well, since you had reviewed the original change.

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice improvment!

@wangyum wangyum closed this in b6aea1a Jun 11, 2022
@wangyum
Copy link
Member

wangyum commented Jun 11, 2022

Thank you all. Merged to master.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @wankunde , @mridulm , @Ngone51 , @wangyum ?

Just a question, was the slow performance a regression at Apache Spark 3.2.0 due to SPARK-32921 (and the umbrella issue SPARK-30602)?

@wangyum
Copy link
Member

wangyum commented Jun 12, 2022

@dongjoon-hyun I don't think this is a regression since all these changes are for push-based shuffles.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants