Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Why opened task less than available executors in case of insert into/load data #4160

Open
01lin opened this issue Jun 23, 2021 · 1 comment

Comments

@01lin
Copy link

01lin commented Jun 23, 2021

In case of insert into or load data, the total number of tasks in the stage is almost equal to the number of hosts, and in general it is much smaller than the available executors. The low parallelism of the stage results in slower execution. Why must the parallelism be constrained on the distinct host? Can start more tasks to increase parallelism and improve resource utilization? Thanks

org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala: loadDataFrame

  /**
   * Execute load process to load from input dataframe
   */
  private def loadDataFrame(
      sqlContext: SQLContext,
      dataFrame: Option[DataFrame],
      carbonLoadModel: CarbonLoadModel
  ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
    try {
      val rdd = dataFrame.get.rdd
      val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p =>
        DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
      }.distinct.length
      val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(
        nodeNumOfData,
        sqlContext.sparkContext) 
      val newRdd = new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, rdd, nodes.toArray
        .distinct)

      new NewDataFrameLoaderRDD(
        sqlContext.sparkSession,
        new DataLoadResultImpl(),
        carbonLoadModel,
        newRdd
      ).collect()
    } catch {
      case ex: Exception =>
        LOGGER.error("load data frame failed", ex)
        throw ex
    }
  }
@QiangCai
Copy link
Contributor

It only works for the local_sort loading.
It can help to avoid data shuffle during executors.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants