Skip to content

Commit

Permalink
feat: add singleton dataset mode for faster performance and use old s…
Browse files Browse the repository at this point in the history
…parse dataset create method to reduce memory usage (#1066)

* feat: add singleton dataset mode for faster performance

Co-authored-by: Mark Hamilton <mhamilton723@gmail.com>
  • Loading branch information
imatiach-msft and mhamilton723 committed Jul 12, 2021
1 parent 41bfd05 commit 0f69cf5
Show file tree
Hide file tree
Showing 22 changed files with 1,170 additions and 590 deletions.
Expand Up @@ -104,8 +104,8 @@ object ClusterUtil {
}
}

def getDriverHost(dataset: Dataset[_]): String = {
val blockManager = BlockManagerUtils.getBlockManager(dataset)
def getDriverHost(spark: SparkSession): String = {
val blockManager = BlockManagerUtils.getBlockManager(spark)
blockManager.master.getMemoryStatus.toList.flatMap({ case (blockManagerId, _) =>
if (blockManagerId.executorId == "driver") Some(getHostToIP(blockManagerId.host))
else None
Expand All @@ -120,11 +120,11 @@ object ClusterUtil {
}

/** Returns a list of executor id and host.
* @param dataset The dataset containing the current spark session.
* @param spark The current spark session.
* @return List of executors as an array of (id,host).
*/
def getExecutors(dataset: Dataset[_]): Array[(Int, String)] = {
val blockManager = BlockManagerUtils.getBlockManager(dataset)
def getExecutors(spark: SparkSession): Array[(Int, String)] = {
val blockManager = BlockManagerUtils.getBlockManager(spark)
blockManager.master.getMemoryStatus.toList.flatMap({ case (blockManagerId, _) =>
if (blockManagerId.executorId == "driver") None
else Some((blockManagerId.executorId.toInt, getHostToIP(blockManagerId.host)))
Expand All @@ -142,35 +142,33 @@ object ClusterUtil {
* @param numTasksPerExec The number of tasks per executor.
* @return The number of executors * number of tasks.
*/
def getNumExecutorTasks(dataset: Dataset[_], numTasksPerExec: Int, log: Logger): Int = {
val executors = getExecutors(dataset)
def getNumExecutorTasks(spark: SparkSession, numTasksPerExec: Int, log: Logger): Int = {
val executors = getExecutors(spark)
log.info(s"Retrieving executors...")
if (!executors.isEmpty) {
log.info(s"Retrieved num executors ${executors.length} with num tasks per executor $numTasksPerExec")
executors.length * numTasksPerExec
} else {
log.info(s"Could not retrieve executors from blockmanager, trying to get from configuration...")
val master = dataset.sparkSession.sparkContext.master
val master = spark.sparkContext.master

//TODO make this less brittle
val rx = "local(?:\\[(\\*|\\d+)(?:,\\d+)?\\])?".r
master match {
case rx(null) => {
case rx(null) =>
log.info(s"Retrieved local() = 1 executor by default")
1
}
case rx("*") => {
case rx("*") =>
log.info(s"Retrieved local(*) = ${Runtime.getRuntime.availableProcessors()} executors")
Runtime.getRuntime.availableProcessors()
}
case rx(cores) => {
case rx(cores) =>
log.info(s"Retrieved local(cores) = $cores executors")
cores.toInt
}
case _ => {
val numExecutors = BlockManagerUtils.getBlockManager(dataset)
case _ =>
val numExecutors = BlockManagerUtils.getBlockManager(spark)
.master.getMemoryStatus.size
log.info(s"Using default case = $numExecutors executors")
numExecutors
}
}
}
}
Expand Down
Expand Up @@ -137,8 +137,3 @@ class Consolidator[T] {

}

trait LocalAggregator[T] {
def prep(iter: Iterator[Row]): T

def merge(ts: Seq[T]): T
}
Expand Up @@ -3,16 +3,16 @@

package org.apache.spark.injections

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.BlockManager

object BlockManagerUtils {
/** Returns the block manager from the dataframe's spark context.
*
* @param data The dataframe to get the block manager from.
* @param spark The spark session to get the block manager from.
* @return The block manager.
*/
def getBlockManager(data: Dataset[_]): BlockManager = {
data.sparkSession.sparkContext.env.blockManager
def getBlockManager(spark: SparkSession): BlockManager = {
spark.sparkContext.env.blockManager
}
}

0 comments on commit 0f69cf5

Please sign in to comment.