Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[SPARK-21052][SQL] Add hash map metrics to join" #23204

Closed
wants to merge 1 commit into from

Conversation

JkSelf
Copy link
Contributor

@JkSelf JkSelf commented Dec 3, 2018

Partial revert SPARK-26155, because of the performance degradation in TPC-DS result with 1TB data scale.

@JkSelf
Copy link
Contributor Author

JkSelf commented Dec 3, 2018

Cluster info:

  Master Node Worker Nodes
Node 1x 4x
Processor Intel(R) Xeon(R) Platinum 8170 CPU @ 2.10GHz Intel(R) Xeon(R) Platinum 8180 CPU @ 2.50GHz
Memory 192 GB 384 GB
Storage Main 8 x 960G SSD 8 x 960G SSD
Network 10Gbe
Role CM Management NameNodeSecondary NameNodeResource ManagerHive Metastore Server DataNodeNodeManager
OS Version CentOS 7.2 CentOS 7.2
Hadoop Apache Hadoop 2.7.5 Apache Hadoop 2.7.5
Hive Apache Hive 2.2.0  
Spark Apache Spark 2.1.0  & Apache Spark2.3.0  
JDK  version 1.8.0_112 1.8.0_112

Related parameters setting:

Component Parameter Value
Yarn Resource Manager yarn.scheduler.maximum-allocation-mb 40GB
  yarn.scheduler.minimum-allocation-mb 1GB
  yarn.scheduler.maximum-allocation-vcores 121
  Yarn.resourcemanager.scheduler.class Fair Scheduler
Yarn Node Manager yarn.nodemanager.resource.memory-mb 40GB
  yarn.nodemanager.resource.cpu-vcores 121
Spark spark.executor.memory 34GB
  spark.executor.cores 40

In above test environment, we found a serious performance degradation issue in Spark2.3 when running TPC-DS on SKX 8180. We investigated this problem and figured out the root cause is in community patch SPARK-21052 which add metrics to hash join process. And the impact code is L486 and L487 .

Following is the result of TPC-DS Q19 in spark2.1, spark2.3 remove L486&487, spark2.3 add L486&487 and spark2.4.

spark2.1 spark2.3 remove L486&487 spark2.3 addL486&487 spark2.4
49s 47s 307s 270s

@cloud-fan
Copy link
Contributor

ok to test

@@ -63,7 +63,7 @@ case class HashAggregateExec(
"peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
"aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time"),
"avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hash probe"))
"avgHashmapProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hashmap probe"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's easy to just run the git revert command, but I'd like to manually revert it, since that PR was merged long time ago. And we should still keep changes like this renaming, as they are not quite related to the performance regression.

@@ -374,22 +374,6 @@ class TungstenAggregationIterator(
}
}

TaskContext.get().addTaskCompletionListener[Unit](_ => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, it's better to put this code block here, let's keep this change.

@cloud-fan
Copy link
Contributor

I'm fine to revert it if it caused a significant performance regression, we should revisit it later, with different ideas, like updating the metrics for each batch instead of each record.

cc @viirya @gatorsmile

@@ -57,12 +57,6 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato

override def add(v: Long): Unit = _value += v

// We can set a double value to `SQLMetric` which stores only long value, if it is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can keep the changes in this file as well.

@@ -483,8 +470,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
*/
def getValue(key: Long, resultRow: UnsafeRow): UnsafeRow = {
if (isDense) {
numKeyLookups += 1
numProbes += 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If as your test shows this is the cause of performance regression, we can just revert this and related changes. The change in HashAggregateExec, etc. can be kept.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya
Copy link
Member

viirya commented Dec 3, 2018

Is this observable in general hash join query, except for TPC-DS Q19?

@SparkQA
Copy link

SparkQA commented Dec 3, 2018

Test build #99615 has finished for PR 23204 at commit 7d5008d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LuciferYang
Copy link
Contributor

@cloud-fan @viirya #23214 maybe reslove this problem and we needn't revert this patch.

@dongjoon-hyun
Copy link
Member

Hi, @LuciferYang . If we are not going to revert this, could you close this PR?

@cloud-fan
Copy link
Contributor

according to #23214 (comment) , the hash join metrics is wrongly implemented. I think it's fine to revert it and re-implement it later.

@JkSelf can you address the comments and only revert the hash join part? thanks!

@JkSelf
Copy link
Contributor Author

JkSelf commented Dec 9, 2018

The result of all queries in tpcds with 1TB data scale is in tpcds result

@JkSelf
Copy link
Contributor Author

JkSelf commented Dec 9, 2018

@cloud-fan ok, i will revert as your comments later.

@dongjoon-hyun
Copy link
Member

@cloud-fan and @JkSelf .
For the partial revert, we had better create a new Apache JIRA issue. That will be a more cleaner way to backport.

@cloud-fan
Copy link
Contributor

+1

@LuciferYang
Copy link
Contributor

@cloud-fan If we decide to partial revert SPARK-21052 and no need for #23214, I will close it.

@cloud-fan
Copy link
Contributor

If we can quickly finish #23214 (within several days), let's go for it. But if we can't, I'd suggest we do the partial revert first to fix the perf regression, and add back the metrics later.

@JkSelf
Copy link
Contributor Author

JkSelf commented Dec 10, 2018

@cloud-fan @dongjoon-hyun update the patch, please help review if you have time. Thanks.

@LuciferYang
Copy link
Contributor

ok~ already close #23214

@cloud-fan
Copy link
Contributor

can we follow #23204 (comment) and create a new ticket?

@SparkQA
Copy link

SparkQA commented Dec 10, 2018

Test build #99894 has finished for PR 23204 at commit 9baf05e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JkSelf
Copy link
Contributor Author

JkSelf commented Dec 10, 2018

@cloud-fan the new ticket is in here. I will close this ticket.

@@ -213,10 +213,6 @@ trait HashJoin {
s"BroadcastHashJoin should not take $x as the JoinType")
}

// At the end of the task, we update the avg hash probe.
TaskContext.get().addTaskCompletionListener[Unit](_ =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this file, the join method takes avgHashProbe: SQLMetric, we should remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan updated in new PR #23269 Thanks.

@dongjoon-hyun
Copy link
Member

I'll close this in order to collect the reviews into new PR, #23269 .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants